db: change iterator api to be more loop-friendly.

This commit is contained in:
Christopher Jeffrey 2017-10-15 02:17:34 -07:00
parent 1c1e429383
commit 4d8ca8c16f
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
8 changed files with 183 additions and 228 deletions

View File

@ -1881,7 +1881,9 @@ ChainDB.prototype.pruneBlock = async function pruneBlock(entry) {
ChainDB.prototype.saveFlags = function saveFlags() {
const flags = ChainFlags.fromOptions(this.options);
return this.db.put(layout.O, flags.toRaw());
const batch = this.db.batch();
batch.put(layout.O, flags.toRaw());
return batch.write();
};
/**

View File

@ -17,37 +17,19 @@ DB.prototype.close = function close(callback) {
};
DB.prototype.get = function get(key, options, callback) {
if (this.bufferKeys && Buffer.isBuffer(key))
key = key.toString('hex');
this.level.get(key, options, callback);
this.level.get(toHex(key), options, callback);
};
DB.prototype.put = function put(key, value, options, callback) {
if (this.bufferKeys && Buffer.isBuffer(key))
key = key.toString('hex');
this.level.put(key, value, options, callback);
this.level.put(toHex(key), value, options, callback);
};
DB.prototype.del = function del(key, options, callback) {
if (this.bufferKeys && Buffer.isBuffer(key))
key = key.toString('hex');
this.level.del(key, options, callback);
this.level.del(toHex(key), options, callback);
};
DB.prototype.batch = function batch(ops, options, callback) {
if (!ops)
return new Batch(this);
if (this.bufferKeys) {
for (const op of ops) {
if (Buffer.isBuffer(op.key))
op.key = op.key.toString('hex');
}
}
this.level.batch(ops, options, callback);
return undefined;
DB.prototype.batch = function batch() {
return new Batch(this);
};
DB.prototype.iterator = function iterator(options) {
@ -65,17 +47,13 @@ function Batch(db) {
}
Batch.prototype.put = function put(key, value) {
if (this.db.bufferKeys && Buffer.isBuffer(key))
key = key.toString('hex');
this.batch.put(key, value);
this.batch.put(toHex(key), value);
this.hasOps = true;
return this;
};
Batch.prototype.del = function del(key) {
if (this.db.bufferKeys && Buffer.isBuffer(key))
key = key.toString('hex');
this.batch.del(key);
this.batch.del(toHex(key));
this.hasOps = true;
return this;
};
@ -93,20 +71,22 @@ Batch.prototype.clear = function clear() {
};
function Iterator(db, options) {
if (db.bufferKeys) {
if (Buffer.isBuffer(options.gt))
options.gt = options.gt.toString('hex');
if (Buffer.isBuffer(options.gte))
options.gte = options.gte.toString('hex');
if (Buffer.isBuffer(options.lt))
options.lt = options.lt.toString('hex');
if (Buffer.isBuffer(options.lte))
options.lte = options.lte.toString('hex');
}
options.keyAsBuffer = false;
const opt = {
gt: toHex(options.gt),
gte: toHex(options.gte),
lt: toHex(options.lt),
lte: toHex(options.lte),
limit: options.limit,
reverse: options.reverse,
keys: options.keys,
values: options.values,
keyAsBuffer: false,
valueAsBuffer: true
};
this.db = db;
this.iter = db.level.iterator(options);
this._end = false;
this.iter = db.level.iterator(opt);
this.ended = false;
}
Iterator.prototype.next = function next(callback) {
@ -114,7 +94,7 @@ Iterator.prototype.next = function next(callback) {
// Hack for level-js: it doesn't actually
// end iterators -- it keeps streaming keys
// and values.
if (this._end)
if (this.ended)
return;
if (err) {
@ -138,18 +118,22 @@ Iterator.prototype.next = function next(callback) {
};
Iterator.prototype.seek = function seek(key) {
if (this.db.bufferKeys && Buffer.isBuffer(key))
key = key.toString('hex');
this.iter.seek(key);
this.iter.seek(toHex(key));
};
Iterator.prototype.end = function end(callback) {
if (this._end) {
if (this.ended) {
callback(new Error('end() already called on iterator.'));
return;
}
this._end = true;
this.ended = true;
this.iter.end(callback);
};
function toHex(key) {
if (Buffer.isBuffer(key))
return key.toString('hex');
return key;
}
module.exports = DB;

View File

@ -8,8 +8,6 @@
'use strict';
const assert = require('assert');
const Lock = require('../utils/lock');
const co = require('../utils/co');
const LOW = Buffer.from([0x00]);
const HIGH = Buffer.from([0xff]);
@ -39,7 +37,6 @@ function LowlevelUp(backend, location, options) {
this.options = new LLUOptions(options);
this.backend = backend;
this.location = location;
this.locker = new Lock();
this.loading = false;
this.closing = false;
@ -89,27 +86,10 @@ LowlevelUp.prototype.init = function init() {
/**
* Open the database.
* @method
* @returns {Promise}
*/
LowlevelUp.prototype.open = async function open() {
const unlock = await this.locker.lock();
try {
return await this._open();
} finally {
unlock();
}
};
/**
* Open the database (without a lock).
* @method
* @private
* @returns {Promise}
*/
LowlevelUp.prototype._open = async function _open() {
if (this.loaded)
throw new Error('Database is already open.');
@ -131,27 +111,10 @@ LowlevelUp.prototype._open = async function _open() {
/**
* Close the database.
* @method
* @returns {Promise}
*/
LowlevelUp.prototype.close = async function close() {
const unlock = await this.locker.lock();
try {
return await this._close();
} finally {
unlock();
}
};
/**
* Close the database (without a lock).
* @method
* @private
* @returns {Promise}
*/
LowlevelUp.prototype._close = async function _close() {
if (!this.loaded)
throw new Error('Database is already closed.');
@ -180,7 +143,7 @@ LowlevelUp.prototype._close = async function _close() {
LowlevelUp.prototype.load = function load() {
return new Promise((resolve, reject) => {
this.binding.open(this.options, co.wrap(resolve, reject));
this.binding.open(this.options, wrap(resolve, reject));
});
};
@ -192,7 +155,7 @@ LowlevelUp.prototype.load = function load() {
LowlevelUp.prototype.unload = function unload() {
return new Promise((resolve, reject) => {
this.binding.close(co.wrap(resolve, reject));
this.binding.close(wrap(resolve, reject));
});
};
@ -213,7 +176,7 @@ LowlevelUp.prototype.destroy = function destroy() {
return;
}
this.backend.destroy(this.location, co.wrap(resolve, reject));
this.backend.destroy(this.location, wrap(resolve, reject));
});
};
@ -234,7 +197,7 @@ LowlevelUp.prototype.repair = function repair() {
return;
}
this.backend.repair(this.location, co.wrap(resolve, reject));
this.backend.repair(this.location, wrap(resolve, reject));
});
};
@ -253,7 +216,7 @@ LowlevelUp.prototype.backup = function backup(path) {
reject(new Error('Database is closed.'));
return;
}
this.binding.backup(path, co.wrap(resolve, reject));
this.binding.backup(path, wrap(resolve, reject));
});
};
@ -299,7 +262,7 @@ LowlevelUp.prototype.put = function put(key, value) {
reject(new Error('Database is closed.'));
return;
}
this.binding.put(key, value, co.wrap(resolve, reject));
this.binding.put(key, value, wrap(resolve, reject));
});
};
@ -315,30 +278,20 @@ LowlevelUp.prototype.del = function del(key) {
reject(new Error('Database is closed.'));
return;
}
this.binding.del(key, co.wrap(resolve, reject));
this.binding.del(key, wrap(resolve, reject));
});
};
/**
* Create an atomic batch.
* @param {Array?} ops
* @returns {Batch}
*/
LowlevelUp.prototype.batch = function batch(ops) {
if (!ops) {
if (!this.loaded)
throw new Error('Database is closed.');
return new Batch(this);
}
LowlevelUp.prototype.batch = function batch() {
if (!this.loaded)
throw new Error('Database is closed.');
return new Promise((resolve, reject) => {
if (!this.loaded) {
reject(new Error('Database is closed.'));
return;
}
this.binding.batch(ops, co.wrap(resolve, reject));
});
return new Batch(this);
};
/**
@ -389,7 +342,7 @@ LowlevelUp.prototype.approximateSize = function approximateSize(start, end) {
return;
}
this.binding.approximateSize(start, end, co.wrap(resolve, reject));
this.binding.approximateSize(start, end, wrap(resolve, reject));
});
};
@ -418,7 +371,7 @@ LowlevelUp.prototype.compactRange = function compactRange(start, end) {
return;
}
this.binding.compactRange(start, end, co.wrap(resolve, reject));
this.binding.compactRange(start, end, wrap(resolve, reject));
});
};
@ -442,7 +395,6 @@ LowlevelUp.prototype.has = async function has(key) {
*/
LowlevelUp.prototype.range = async function range(options) {
const items = [];
const parse = options.parse;
const iter = this.iterator({
@ -452,23 +404,28 @@ LowlevelUp.prototype.range = async function range(options) {
values: true
});
for (;;) {
let item = await iter.next();
const items = [];
if (!item)
break;
while (await iter.next()) {
const {key, value} = iter;
if (parse) {
let item;
try {
item = parse(item.key, item.value);
item = parse(key, value);
} catch (e) {
await iter.end();
throw e;
}
if (item)
items.push(item);
continue;
}
if (item)
items.push(item);
items.push(new IteratorItem(key, value));
}
return items;
@ -482,7 +439,6 @@ LowlevelUp.prototype.range = async function range(options) {
*/
LowlevelUp.prototype.keys = async function keys(options) {
const items = [];
const parse = options.parse;
const iter = this.iterator({
@ -492,13 +448,10 @@ LowlevelUp.prototype.keys = async function keys(options) {
values: false
});
for (;;) {
const item = await iter.next();
const items = [];
if (!item)
break;
let key = item.key;
while (await iter.next()) {
let {key} = iter;
if (parse) {
try {
@ -534,13 +487,8 @@ LowlevelUp.prototype.values = async function values(options) {
values: true
});
for (;;) {
const item = await iter.next();
if (!item)
break;
let value = item.value;
while (await iter.next()) {
let {value} = iter;
if (parse) {
try {
@ -589,18 +537,20 @@ LowlevelUp.prototype.dump = async function dump() {
*/
LowlevelUp.prototype.checkVersion = async function checkVersion(key, version) {
let data = await this.get(key);
const data = await this.get(key);
if (!data) {
data = Buffer.allocUnsafe(4);
data.writeUInt32LE(version, 0, true);
await this.put(key, data);
const value = Buffer.allocUnsafe(4);
value.writeUInt32LE(version, 0, true);
const batch = this.batch();
batch.put(key, value);
await batch.write();
return;
}
data = data.readUInt32LE(0, true);
const num = data.readUInt32LE(0, true);
if (data !== version)
if (num !== version)
throw new Error(VERSION_ERROR);
};
@ -625,22 +575,21 @@ LowlevelUp.prototype.clone = async function clone(path) {
await tmp.open();
let batch = tmp.batch();
let total = 0;
const iter = this.iterator({
keys: true,
values: true
});
for (;;) {
const item = await iter.next();
let batch = tmp.batch();
let total = 0;
if (!item)
break;
while (await iter.next()) {
const {key, value} = iter;
batch.put(item.key, item.value);
total += item.value.length;
batch.put(key, value);
total += key.length + 80;
total += value.length + 80;
if (total >= hwm) {
total = 0;
@ -705,7 +654,7 @@ Batch.prototype.del = function del(key) {
Batch.prototype.write = function write() {
return new Promise((resolve, reject) => {
this.batch.write(co.wrap(resolve, reject));
this.batch.write(wrap(resolve, reject));
});
};
@ -727,10 +676,12 @@ Batch.prototype.clear = function clear() {
*/
function Iterator(db, options) {
options = new IteratorOptions(options);
options.keyAsBuffer = db.options.bufferKeys;
this.iter = db.db.iterator(options);
this.options = new IteratorOptions(options);
this.options.keyAsBuffer = db.options.bufferKeys;
this.iter = db.db.iterator(this.options);
this.key = null;
this.value = null;
this.valid = true;
}
/**
@ -742,16 +693,36 @@ Iterator.prototype.next = function next() {
return new Promise((resolve, reject) => {
this.iter.next((err, key, value) => {
if (err) {
this.iter.end(() => reject(err));
this.iter.end(() => {
this.key = null;
this.value = null;
this.valid = false;
reject(err);
});
return;
}
if (key === undefined && value === undefined) {
this.iter.end(co.wrap(resolve, reject));
this.iter.end((err) => {
if (err) {
reject(err);
return;
}
this.key = null;
this.value = null;
this.valid = false;
resolve(false);
});
return;
}
resolve(new IteratorItem(key, value));
this.key = key;
this.value = value;
resolve(true);
});
});
};
@ -772,7 +743,18 @@ Iterator.prototype.seek = function seek(key) {
Iterator.prototype.end = function end() {
return new Promise((resolve, reject) => {
this.iter.end(co.wrap(resolve, reject));
this.iter.end((err) => {
if (err) {
reject(err);
return;
}
this.key = null;
this.value = null;
this.valid = false;
resolve();
});
});
};
@ -924,6 +906,8 @@ LLUOptions.prototype.fromOptions = function fromOptions(options) {
function IteratorOptions(options) {
this.gte = null;
this.lte = null;
this.gt = null;
this.lt = null;
this.keys = true;
this.values = false;
this.fillCache = false;
@ -959,6 +943,16 @@ IteratorOptions.prototype.fromOptions = function fromOptions(options) {
this.lte = options.lte;
}
if (options.gt != null) {
assert(Buffer.isBuffer(options.gt) || typeof options.gt === 'string');
this.gt = options.gt;
}
if (options.lt != null) {
assert(Buffer.isBuffer(options.lt) || typeof options.lt === 'string');
this.lt = options.lt;
}
if (options.keys != null) {
assert(typeof options.keys === 'boolean');
this.keys = options.keys;
@ -979,11 +973,6 @@ IteratorOptions.prototype.fromOptions = function fromOptions(options) {
this.keyAsBuffer = options.keyAsBuffer;
}
if (options.valueAsBuffer != null) {
assert(typeof options.valueAsBuffer === 'boolean');
this.valueAsBuffer = options.valueAsBuffer;
}
if (options.reverse != null) {
assert(typeof options.reverse === 'boolean');
this.reverse = options.reverse;
@ -1014,6 +1003,16 @@ function isNotFound(err) {
|| /not\s*found/i.test(err.message);
}
function wrap(resolve, reject) {
return function(err, result) {
if (err) {
reject(err);
return;
}
resolve(result);
};
}
VERSION_ERROR = 'Warning:'
+ ' Your database does not match the current database version.'
+ ' This is likely because the database layout or serialization'

View File

@ -352,14 +352,11 @@ WalletDB.prototype.watch = async function watch() {
let hashes = 0;
let outpoints = 0;
for (;;) {
const item = await iter.next();
if (!item)
break;
while (await iter.next()) {
const {key} = iter;
try {
const data = layout.pp(item.key);
const data = layout.pp(key);
this.filter.add(data, 'hex');
} catch (e) {
await iter.end();
@ -374,14 +371,11 @@ WalletDB.prototype.watch = async function watch() {
lte: layout.o(encoding.HIGH_HASH, 0xffffffff)
});
for (;;) {
const item = await iter.next();
if (!item)
break;
while (await iter.next()) {
const {key} = iter;
try {
const [hash, index] = layout.oo(item.key);
const [hash, index] = layout.oo(key);
const outpoint = new Outpoint(hash, index);
const data = outpoint.toRaw();
this.filter.add(data);
@ -607,14 +601,11 @@ WalletDB.prototype.wipe = async function wipe() {
const batch = this.db.batch();
let total = 0;
for (;;) {
const item = await iter.next();
if (!item)
break;
while (await iter.next()) {
const {key} = iter;
try {
switch (item.key[0]) {
switch (key[0]) {
case 0x62: // b
case 0x63: // c
case 0x65: // e
@ -622,7 +613,7 @@ WalletDB.prototype.wipe = async function wipe() {
case 0x6f: // o
case 0x68: // h
case 0x52: // R
batch.del(item.key);
batch.del(key);
total++;
break;
}
@ -660,14 +651,14 @@ WalletDB.prototype.getDepth = async function getDepth() {
limit: 1
});
const item = await iter.next();
if (!item)
if (!await iter.next())
return 1;
const {key} = iter;
await iter.end();
const depth = layout.ww(item.key);
const depth = layout.ww(key);
return depth + 1;
};
@ -1578,14 +1569,9 @@ WalletDB.prototype.resetState = async function resetState(tip, marked) {
values: false
});
for (;;) {
const item = await iter.next();
if (!item)
break;
while (await iter.next()) {
try {
batch.del(item.key);
batch.del(iter.key);
} catch (e) {
await iter.end();
throw e;
@ -1849,15 +1835,12 @@ WalletDB.prototype.revert = async function revert(target) {
let total = 0;
for (;;) {
const item = await iter.next();
if (!item)
break;
while (await iter.next()) {
const {key, value} = iter;
try {
const height = layout.bb(item.key);
const block = BlockMapRecord.fromRaw(height, item.value);
const height = layout.bb(key);
const block = BlockMapRecord.fromRaw(height, value);
const txs = block.toArray();
total += txs.length;

View File

@ -83,14 +83,10 @@ async function updateEndian() {
values: true
});
for (;;) {
const item = await iter.next();
if (!item)
break;
batch.del(item.key);
batch.put(makeKey(item.key), item.value);
while (await iter.next()) {
const {key, value} = iter;
batch.del(key);
batch.put(makeKey(key), value);
total++;
}

View File

@ -120,14 +120,10 @@ async function reserializeCoins() {
values: true
});
for (;;) {
const item = await iter.next();
if (!item)
break;
const hash = item.key.toString('hex', 1, 33);
const old = OldCoins.fromRaw(item.value, hash);
while (await iter.next()) {
const {key, value} = iter;
const hash = key.toString('hex', 1, 33);
const old = OldCoins.fromRaw(value, hash);
const coins = new Coins();
coins.version = old.version;
@ -153,7 +149,7 @@ async function reserializeCoins() {
coins.cleanup();
batch.put(item.key, coins.toRaw());
batch.put(key, coins.toRaw());
if (++total % 100000 === 0)
console.log('Reserialized %d coins.', total);

View File

@ -274,13 +274,10 @@ async function cleanupIndex() {
let batch = db.batch();
let total = 0;
for (;;) {
const item = await iter.next();
while (await iter.next()) {
const {key} = iter;
if (!item)
break;
batch.del(item.key);
batch.del(key);
if (++total % 10000 === 0) {
console.log('Cleaned up %d undo records.', total);

View File

@ -61,15 +61,13 @@ async function updatePathMap() {
console.log('Migrating path map.');
for (;;) {
const item = await iter.next();
if (!item)
break;
while (await iter.next()) {
const {key, value} = iter;
total++;
const hash = layout.pp(item.key);
const oldPaths = parsePaths(item.value, hash);
const hash = layout.pp(key);
const oldPaths = parsePaths(value, hash);
const keys = Object.keys(oldPaths);
for (let i = 0; i < keys.length; i++) {
@ -91,7 +89,7 @@ async function updatePathMap() {
batch.put(layout.P(key, hash), path.toRaw());
}
batch.put(item.key, serializeWallets(keys.sort()));
batch.put(key, serializeWallets(keys.sort()));
}
console.log('Migrated %d paths.', total);