db: improve iterator perf.

This commit is contained in:
Christopher Jeffrey 2017-10-15 18:29:44 -07:00
parent 4d8ca8c16f
commit bfde70f2ab
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD

View File

@ -42,8 +42,8 @@ function LowlevelUp(backend, location, options) {
this.closing = false;
this.loaded = false;
this.db = null;
this.binding = null;
this.leveldown = false;
this.init();
}
@ -58,7 +58,6 @@ LowlevelUp.prototype.init = function init() {
const Backend = this.backend;
let db = new Backend(this.location);
let binding = db;
// Stay as close to the metal as possible.
// We want to make calls to C++ directly.
@ -73,15 +72,15 @@ LowlevelUp.prototype.init = function init() {
// Go deeper.
db = db.db;
binding = db;
}
// A lower-level binding.
if (db.binding)
binding = db.binding;
this.db = db;
this.binding = binding;
if (db.binding) {
this.binding = db.binding;
this.leveldown = db !== db.binding;
} else {
this.binding = db;
}
};
/**
@ -395,8 +394,6 @@ LowlevelUp.prototype.has = async function has(key) {
*/
LowlevelUp.prototype.range = async function range(options) {
const parse = options.parse;
const iter = this.iterator({
gte: options.gte,
lte: options.lte,
@ -406,27 +403,15 @@ LowlevelUp.prototype.range = async function range(options) {
const items = [];
while (await iter.next()) {
const {key, value} = iter;
if (parse) {
let item;
try {
item = parse(key, value);
} catch (e) {
await iter.end();
throw e;
}
await iter.each((key, value) => {
if (options.parse) {
const item = options.parse(key, value);
if (item)
items.push(item);
continue;
} else {
items.push(new IteratorItem(key, value));
}
items.push(new IteratorItem(key, value));
}
});
return items;
};
@ -439,8 +424,6 @@ LowlevelUp.prototype.range = async function range(options) {
*/
LowlevelUp.prototype.keys = async function keys(options) {
const parse = options.parse;
const iter = this.iterator({
gte: options.gte,
lte: options.lte,
@ -450,21 +433,11 @@ LowlevelUp.prototype.keys = async function keys(options) {
const items = [];
while (await iter.next()) {
let {key} = iter;
if (parse) {
try {
key = parse(key);
} catch (e) {
await iter.end();
throw e;
}
}
if (key)
items.push(key);
}
await iter.each((key) => {
if (options.parse)
key = options.parse(key);
items.push(key);
});
return items;
};
@ -477,9 +450,6 @@ LowlevelUp.prototype.keys = async function keys(options) {
*/
LowlevelUp.prototype.values = async function values(options) {
const items = [];
const parse = options.parse;
const iter = this.iterator({
gte: options.gte,
lte: options.lte,
@ -487,21 +457,13 @@ LowlevelUp.prototype.values = async function values(options) {
values: true
});
while (await iter.next()) {
let {value} = iter;
const items = [];
if (parse) {
try {
value = parse(value);
} catch (e) {
await iter.end();
throw e;
}
}
if (value)
items.push(value);
}
await iter.each((value) => {
if (options.parse)
value = options.parse(value);
items.push(value);
});
return items;
};
@ -565,9 +527,9 @@ LowlevelUp.prototype.clone = async function clone(path) {
if (!this.loaded)
throw new Error('Database is closed.');
const options = new LLUOptions(this.options);
const hwm = 256 << 20;
const options = new LLUOptions(this.options);
options.createIfMissing = true;
options.errorIfExists = true;
@ -593,6 +555,7 @@ LowlevelUp.prototype.clone = async function clone(path) {
if (total >= hwm) {
total = 0;
try {
await batch.write();
} catch (e) {
@ -600,6 +563,7 @@ LowlevelUp.prototype.clone = async function clone(path) {
await tmp.close();
throw e;
}
batch = tmp.batch();
}
}
@ -678,51 +642,141 @@ Batch.prototype.clear = function clear() {
function Iterator(db, options) {
this.options = new IteratorOptions(options);
this.options.keyAsBuffer = db.options.bufferKeys;
this.iter = db.db.iterator(this.options);
this.iter = db.binding.iterator(this.options);
this.leveldown = db.leveldown;
this.cache = [];
this.finished = false;
this.key = null;
this.value = null;
this.valid = true;
}
/**
* Clean up iterator.
* @private
*/
Iterator.prototype.cleanup = function cleanup() {
this.cache = [];
this.finished = true;
this.key = null;
this.value = null;
this.valid = false;
};
/**
* For each.
* @returns {Promise}
*/
Iterator.prototype.each = async function each(cb) {
assert(this.valid);
const {keys, values} = this.options;
while (!this.finished) {
await this.read();
while (this.cache.length > 0) {
const key = this.cache.pop();
const value = this.cache.pop();
let result = null;
try {
if (keys && values)
result = cb(key, value);
else if (keys)
result = cb(key);
else if (values)
result = cb(value);
else
assert(false);
if (result instanceof Promise)
result = await result;
} catch (e) {
await this.end();
throw e;
}
if (result === false) {
await this.end();
break;
}
}
}
};
/**
* Seek to the next key.
* @returns {Promise}
*/
Iterator.prototype.next = function next() {
Iterator.prototype.next = async function next() {
assert(this.valid);
if (!this.finished) {
if (this.cache.length === 0)
await this.read();
}
if (this.cache.length > 0) {
this.key = this.cache.pop();
this.value = this.cache.pop();
return true;
}
assert(this.finished);
this.cleanup();
return false;
};
/**
* Seek to the next key (buffer values).
* @private
* @returns {Promise}
*/
Iterator.prototype.read = function read() {
return new Promise((resolve, reject) => {
this.iter.next((err, key, value) => {
if (!this.leveldown) {
this.iter.next((err, key, value) => {
if (err) {
this.cleanup();
this.iter.end(() => reject(err));
return;
}
if (key === undefined && value === undefined) {
this.cleanup();
this.iter.end(wrap(resolve, reject));
return;
}
this.cache = [value, key];
resolve();
});
return;
}
this.iter.next((err, cache, finished) => {
if (err) {
this.iter.end(() => {
this.key = null;
this.value = null;
this.valid = false;
reject(err);
});
this.cleanup();
this.iter.end(() => reject(err));
return;
}
if (key === undefined && value === undefined) {
this.iter.end((err) => {
if (err) {
reject(err);
return;
}
this.cache = cache;
this.finished = finished;
this.key = null;
this.value = null;
this.valid = false;
resolve(false);
});
return;
}
this.key = key;
this.value = value;
resolve(true);
resolve();
});
});
};
@ -733,6 +787,7 @@ Iterator.prototype.next = function next() {
*/
Iterator.prototype.seek = function seek(key) {
assert(this.valid);
this.iter.seek(key);
};
@ -743,18 +798,8 @@ Iterator.prototype.seek = function seek(key) {
Iterator.prototype.end = function end() {
return new Promise((resolve, reject) => {
this.iter.end((err) => {
if (err) {
reject(err);
return;
}
this.key = null;
this.value = null;
this.valid = false;
resolve();
});
this.cleanup();
this.iter.end(wrap(resolve, reject));
});
};