Fix RocksDB and LMDB

This commit is contained in:
Johann Bauer 2016-11-06 18:16:43 +01:00
parent d34e0ed6d1
commit f6f674577f
2 changed files with 46 additions and 9 deletions

View File

@ -109,3 +109,20 @@ def int_to_bytes(value):
value, mod = divmod(value, 256) value, mod = divmod(value, 256)
mods.append(mod) mods.append(mod)
return bytes(reversed(mods)) return bytes(reversed(mods))
def increment_byte_string(bs):
bs = bytearray(bs)
incremented = False
for i in reversed(range(len(bs))):
if bs[i] < 0xff:
# This is easy
bs[i] += 1
incremented = True
break
# Otherwise we need to look at the previous character
bs[i] = 0
if not incremented:
# This can only happen if all characters are 0xff
bs = bytes([1]) + bs
return bytes(bs)

View File

@ -13,7 +13,7 @@ The abstraction needs to be improved to not heavily penalise LMDB.
import os import os
from functools import partial from functools import partial
from lib.util import subclasses from lib.util import subclasses, increment_byte_string
def open_db(name, db_engine): def open_db(name, db_engine):
@ -122,16 +122,24 @@ class RocksDB(Storage):
class Iterator(object): class Iterator(object):
def __init__(self, db, prefix, reverse): def __init__(self, db, prefix, reverse):
self.it = db.iteritems() self.it = db.iteritems()
if reverse: self.reverse = reverse
self.it = reversed(self.it)
self.prefix = prefix self.prefix = prefix
# Whether we are at the first item
self.first = True
def __iter__(self): def __iter__(self):
self.it.seek(self.prefix) prefix = self.prefix
if self.reverse:
prefix = increment_byte_string(prefix)
self.it = reversed(self.it)
self.it.seek(prefix)
return self return self
def __next__(self): def __next__(self):
k, v = self.it.__next__() k, v = self.it.__next__()
if self.first and self.reverse and not k.startswith(self.prefix):
k, v = self.it.__next__()
self.first = False
if not k.startswith(self.prefix): if not k.startswith(self.prefix):
# We're already ahead of the prefix # We're already ahead of the prefix
raise StopIteration raise StopIteration
@ -150,7 +158,7 @@ class LMDB(Storage):
cls.module = lmdb cls.module = lmdb
def open(self, name, create): def open(self, name, create):
self.env = cls.module.Environment('.', subdir=True, create=create, self.env = LMDB.module.Environment('.', subdir=True, create=create,
max_dbs=32, map_size=5 * 10 ** 10) max_dbs=32, map_size=5 * 10 ** 10)
self.db = self.env.open_db(create=create) self.db = self.env.open_db(create=create)
@ -174,17 +182,29 @@ class LMDB(Storage):
self.transaction.__enter__() self.transaction.__enter__()
self.db = db self.db = db
self.prefix = prefix self.prefix = prefix
self.reverse = reverse # FIXME self.reverse = reverse
self._stop = False
def __iter__(self): def __iter__(self):
self.iterator = LMDB.lmdb.Cursor(self.db, self.transaction) self.iterator = LMDB.module.Cursor(self.db, self.transaction)
self.iterator.set_range(self.prefix) prefix = self.prefix
if self.reverse:
# Go to the first value after the prefix
prefix = increment_byte_string(prefix)
self.iterator.set_range(prefix)
if not self.iterator.key().startswith(self.prefix) and self.reverse:
# Go back to the first item starting with the prefix
self.iterator.prev()
return self return self
def __next__(self): def __next__(self):
k, v = self.iterator.item() k, v = self.iterator.item()
if not k.startswith(self.prefix) or not self.iterator.next(): if not k.startswith(self.prefix) or self._stop:
# We're already ahead of the prefix # We're already ahead of the prefix
self.transaction.__exit__() self.transaction.__exit__()
raise StopIteration raise StopIteration
next = self.iterator.next \
if not self.reverse else self.iterator.prev
# Stop after the next value if we're at the end of the DB
self._stop = not next()
return k, v return k, v