Merge pull request #2 from bauerj/db-engine
Add support for RocksDB and LMDB
This commit is contained in:
commit
816bdf23ff
3
AUTHORS
3
AUTHORS
@ -1 +1,2 @@
|
||||
Neil Booth: creator and maintainer
|
||||
Neil Booth: creator and maintainer
|
||||
Johann Bauer: backend DB abstraction
|
||||
13
HOWTO.rst
13
HOWTO.rst
@ -29,6 +29,19 @@ metadata comes to just over 17GB. Leveldb needs a bit more for brief
|
||||
periods, and the block chain is only getting longer, so I would
|
||||
recommend having at least 30-40GB free space.
|
||||
|
||||
Database Engine
|
||||
===============
|
||||
|
||||
You can choose between either RocksDB, LevelDB or LMDB to store transaction
|
||||
information on disk. Currently, the fastest seems to be RocksDB with LevelDB
|
||||
being about 10% slower. LMDB seems to be the slowest but maybe that's because
|
||||
of bad implementation or configuration.
|
||||
|
||||
You will need to install either:
|
||||
|
||||
+ `plyvel <https://plyvel.readthedocs.io/en/latest/installation.html>`_ for LevelDB
|
||||
+ `pyrocksdb <http://pyrocksdb.readthedocs.io/en/v0.4/installation.html>`_ for RocksDB
|
||||
+ `lmdb <https://lmdb.readthedocs.io/en/release/#installation-unix>`_ for LMDB
|
||||
|
||||
Running
|
||||
=======
|
||||
|
||||
@ -31,18 +31,21 @@ bit bigger than the combine cache size, because of Python overhead and
|
||||
also because leveldb can consume quite a lot of memory during UTXO
|
||||
flushing. So these are rough numbers only:
|
||||
|
||||
HIST_MB - amount of history cache, in MB, to retain before flushing to
|
||||
disk. Default is 250; probably no benefit being much larger
|
||||
as history is append-only and not searched.
|
||||
HIST_MB - amount of history cache, in MB, to retain before flushing to
|
||||
disk. Default is 250; probably no benefit being much larger
|
||||
as history is append-only and not searched.
|
||||
|
||||
UTXO_MB - amount of UTXO and history cache, in MB, to retain before
|
||||
flushing to disk. Default is 1000. This may be too large
|
||||
for small boxes or too small for machines with lots of RAM.
|
||||
Larger caches generally perform better as there is
|
||||
significant searching of the UTXO cache during indexing.
|
||||
However, I don't see much benefit in my tests pushing this
|
||||
too high, and in fact performance begins to fall. My
|
||||
machine has 24GB RAM; the slow down is probably because of
|
||||
leveldb caching and Python GC effects. However this may be
|
||||
very dependent on hardware and you may have different
|
||||
results.
|
||||
UTXO_MB - amount of UTXO and history cache, in MB, to retain before
|
||||
flushing to disk. Default is 1000. This may be too large
|
||||
for small boxes or too small for machines with lots of RAM.
|
||||
Larger caches generally perform better as there is
|
||||
significant searching of the UTXO cache during indexing.
|
||||
However, I don't see much benefit in my tests pushing this
|
||||
too high, and in fact performance begins to fall. My
|
||||
machine has 24GB RAM; the slow down is probably because of
|
||||
leveldb caching and Python GC effects. However this may be
|
||||
very dependent on hardware and you may have different
|
||||
results.
|
||||
|
||||
DB_ENGINE - database engine for the transaction database, either rocksdb,
|
||||
leveldb or lmdb
|
||||
@ -10,13 +10,12 @@ from bisect import bisect_left
|
||||
from collections import defaultdict, namedtuple
|
||||
from functools import partial
|
||||
|
||||
import plyvel
|
||||
|
||||
from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY
|
||||
from server.daemon import DaemonError
|
||||
from lib.hash import hash_to_str
|
||||
from lib.script import ScriptPubKey
|
||||
from lib.util import chunks, LoggedClass
|
||||
from server.storage import LMDB, RocksDB, LevelDB, NoDatabaseException
|
||||
|
||||
|
||||
def formatted_time(t):
|
||||
@ -142,7 +141,7 @@ class BlockProcessor(LoggedClass):
|
||||
self.first_sync = True
|
||||
|
||||
# Open DB and metadata files. Record some of its state.
|
||||
self.db = self.open_db(self.coin)
|
||||
self.db = self.open_db(self.coin, env.db_engine)
|
||||
self.tx_count = self.db_tx_count
|
||||
self.height = self.db_height
|
||||
self.tip = self.db_tip
|
||||
@ -205,7 +204,7 @@ class BlockProcessor(LoggedClass):
|
||||
await self.handle_chain_reorg()
|
||||
self.caught_up = False
|
||||
break
|
||||
await asyncio.sleep(0) # Yield
|
||||
await asyncio.sleep(0) # Yield
|
||||
|
||||
if self.height != self.daemon.cached_height():
|
||||
continue
|
||||
@ -244,6 +243,7 @@ class BlockProcessor(LoggedClass):
|
||||
'''Return the list of hashes to back up beacuse of a reorg.
|
||||
|
||||
The hashes are returned in order of increasing height.'''
|
||||
|
||||
def match_pos(hashes1, hashes2):
|
||||
for n, (hash1, hash2) in enumerate(zip(hashes1, hashes2)):
|
||||
if hash1 == hash2:
|
||||
@ -273,17 +273,22 @@ class BlockProcessor(LoggedClass):
|
||||
|
||||
return self.fs_cache.block_hashes(start, count)
|
||||
|
||||
def open_db(self, coin):
|
||||
def open_db(self, coin, db_engine):
|
||||
db_name = '{}-{}'.format(coin.NAME, coin.NET)
|
||||
db_engine_class = {
|
||||
"leveldb": LevelDB,
|
||||
"rocksdb": RocksDB,
|
||||
"lmdb": LMDB
|
||||
}[db_engine.lower()]
|
||||
try:
|
||||
db = plyvel.DB(db_name, create_if_missing=False,
|
||||
error_if_exists=False, compression=None)
|
||||
except:
|
||||
db = plyvel.DB(db_name, create_if_missing=True,
|
||||
error_if_exists=True, compression=None)
|
||||
self.logger.info('created new database {}'.format(db_name))
|
||||
db = db_engine_class(db_name, create_if_missing=False,
|
||||
error_if_exists=False, compression=None)
|
||||
except NoDatabaseException:
|
||||
db = db_engine_class(db_name, create_if_missing=True,
|
||||
error_if_exists=True, compression=None)
|
||||
self.logger.info('created new {} database {}'.format(db_engine, db_name))
|
||||
else:
|
||||
self.logger.info('successfully opened database {}'.format(db_name))
|
||||
self.logger.info('successfully opened {} database {}'.format(db_engine, db_name))
|
||||
self.read_state(db)
|
||||
|
||||
return db
|
||||
@ -312,7 +317,7 @@ class BlockProcessor(LoggedClass):
|
||||
'''
|
||||
if self.flush_count < self.utxo_flush_count:
|
||||
raise ChainError('DB corrupt: flush_count < utxo_flush_count')
|
||||
with self.db.write_batch(transaction=True) as batch:
|
||||
with self.db.write_batch() as batch:
|
||||
if self.flush_count > self.utxo_flush_count:
|
||||
self.logger.info('DB shut down uncleanly. Scanning for '
|
||||
'excess history flushes...')
|
||||
@ -410,7 +415,7 @@ class BlockProcessor(LoggedClass):
|
||||
if self.height > self.db_height:
|
||||
self.fs_cache.flush(self.height, self.tx_count)
|
||||
|
||||
with self.db.write_batch(transaction=True) as batch:
|
||||
with self.db.write_batch() as batch:
|
||||
# History first - fast and frees memory. Flush state last
|
||||
# as it reads the wall time.
|
||||
if self.height > self.db_height:
|
||||
@ -656,7 +661,7 @@ class BlockProcessor(LoggedClass):
|
||||
if not tx.is_coinbase:
|
||||
for txin in reversed(tx.inputs):
|
||||
n -= 33
|
||||
undo_item = undo_info[n:n+33]
|
||||
undo_item = undo_info[n:n + 33]
|
||||
put_utxo(txin.prev_hash + pack('<H', txin.prev_idx),
|
||||
undo_item)
|
||||
hash168s.add(undo_item[:21])
|
||||
@ -703,13 +708,13 @@ class BlockProcessor(LoggedClass):
|
||||
prefix = b'u' + hash168
|
||||
utxos = []
|
||||
for k, v in self.db.iterator(prefix=prefix):
|
||||
(tx_pos, ) = unpack('<H', k[-2:])
|
||||
(tx_pos,) = unpack('<H', k[-2:])
|
||||
|
||||
for n in range(0, len(v), 12):
|
||||
if limit == 0:
|
||||
return
|
||||
(tx_num, ) = unpack('<I', v[n:n+4])
|
||||
(value, ) = unpack('<Q', v[n+4:n+12])
|
||||
(tx_num,) = unpack('<I', v[n:n + 4])
|
||||
(value,) = unpack('<Q', v[n + 4:n + 12])
|
||||
tx_hash, height = self.get_tx_hash(tx_num)
|
||||
yield UTXO(tx_num, tx_pos, tx_hash, height, value)
|
||||
limit -= 1
|
||||
|
||||
@ -32,6 +32,7 @@ class Env(LoggedClass):
|
||||
self.banner_file = self.default('BANNER_FILE', None)
|
||||
# The electrum client takes the empty string as unspecified
|
||||
self.donation_address = self.default('DONATION_ADDRESS', '')
|
||||
self.db_engine = self.default('DB_ENGINE', 'leveldb')
|
||||
|
||||
def default(self, envvar, default):
|
||||
return environ.get(envvar, default)
|
||||
|
||||
152
server/storage.py
Normal file
152
server/storage.py
Normal file
@ -0,0 +1,152 @@
|
||||
import os
|
||||
|
||||
|
||||
class Storage(object):
|
||||
def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None):
|
||||
if not create_if_missing and not os.path.exists(name):
|
||||
raise NoDatabaseException
|
||||
|
||||
def get(self, key):
|
||||
raise NotImplementedError()
|
||||
|
||||
def put(self, key, value):
|
||||
raise NotImplementedError()
|
||||
|
||||
def write_batch(self):
|
||||
"""
|
||||
Returns a context manager that provides `put` and `delete`.
|
||||
Changes should only be committed when the context manager closes without an exception.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def iterator(self, prefix=b""):
|
||||
"""
|
||||
Returns an iterator that yields (key, value) pairs from the database sorted by key.
|
||||
If `prefix` is set, only keys starting with `prefix` will be included.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class NoDatabaseException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class LevelDB(Storage):
|
||||
def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None):
|
||||
super().__init__(name, create_if_missing, error_if_exists, compression)
|
||||
import plyvel
|
||||
self.db = plyvel.DB(name, create_if_missing=create_if_missing,
|
||||
error_if_exists=error_if_exists, compression=compression)
|
||||
|
||||
def get(self, key):
|
||||
return self.db.get(key)
|
||||
|
||||
def write_batch(self):
|
||||
return self.db.write_batch(transaction=True)
|
||||
|
||||
def iterator(self, prefix=b""):
|
||||
return self.db.iterator(prefix=prefix)
|
||||
|
||||
def put(self, key, value):
|
||||
self.db.put(key, value)
|
||||
|
||||
|
||||
class RocksDB(Storage):
|
||||
rocksdb = None
|
||||
|
||||
def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None):
|
||||
super().__init__(name, create_if_missing, error_if_exists, compression)
|
||||
import rocksdb
|
||||
RocksDB.rocksdb = rocksdb
|
||||
if not compression:
|
||||
compression = "no"
|
||||
compression = getattr(rocksdb.CompressionType, compression + "_compression")
|
||||
self.db = rocksdb.DB(name, rocksdb.Options(create_if_missing=create_if_missing,
|
||||
compression=compression,
|
||||
target_file_size_base=33554432,
|
||||
max_open_files=1024))
|
||||
|
||||
def get(self, key):
|
||||
return self.db.get(key)
|
||||
|
||||
class WriteBatch(object):
|
||||
def __init__(self, db):
|
||||
self.batch = RocksDB.rocksdb.WriteBatch()
|
||||
self.db = db
|
||||
|
||||
def __enter__(self):
|
||||
return self.batch
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if not exc_val:
|
||||
self.db.write(self.batch)
|
||||
|
||||
def write_batch(self):
|
||||
return RocksDB.WriteBatch(self.db)
|
||||
|
||||
class Iterator(object):
|
||||
def __init__(self, db, prefix):
|
||||
self.it = db.iteritems()
|
||||
self.prefix = prefix
|
||||
|
||||
def __iter__(self):
|
||||
self.it.seek(self.prefix)
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
k, v = self.it.__next__()
|
||||
if not k.startswith(self.prefix):
|
||||
# We're already ahead of the prefix
|
||||
raise StopIteration
|
||||
return k, v
|
||||
|
||||
def iterator(self, prefix=b""):
|
||||
return RocksDB.Iterator(self.db, prefix)
|
||||
|
||||
def put(self, key, value):
|
||||
return self.db.put(key, value)
|
||||
|
||||
|
||||
class LMDB(Storage):
|
||||
lmdb = None
|
||||
|
||||
def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None):
|
||||
super().__init__(name, create_if_missing, error_if_exists, compression)
|
||||
import lmdb
|
||||
LMDB.lmdb = lmdb
|
||||
self.env = lmdb.Environment(".", subdir=True, create=create_if_missing, max_dbs=32, map_size=5 * 10 ** 10)
|
||||
self.db = self.env.open_db(create=create_if_missing)
|
||||
|
||||
def get(self, key):
|
||||
with self.env.begin(db=self.db) as tx:
|
||||
return tx.get(key)
|
||||
|
||||
def put(self, key, value):
|
||||
with self.env.begin(db=self.db, write=True) as tx:
|
||||
tx.put(key, value)
|
||||
|
||||
def write_batch(self):
|
||||
return self.env.begin(db=self.db, write=True)
|
||||
|
||||
def iterator(self, prefix=b""):
|
||||
return LMDB.lmdb.Iterator(self.db, self.env, prefix)
|
||||
|
||||
class Iterator:
|
||||
def __init__(self, db, env, prefix):
|
||||
self.transaction = env.begin(db=db)
|
||||
self.transaction.__enter__()
|
||||
self.db = db
|
||||
self.prefix = prefix
|
||||
|
||||
def __iter__(self):
|
||||
self.iterator = LMDB.lmdb.Cursor(self.db, self.transaction)
|
||||
self.iterator.set_range(self.prefix)
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
k, v = self.iterator.item()
|
||||
if not k.startswith(self.prefix) or not self.iterator.next():
|
||||
# We're already ahead of the prefix
|
||||
self.transaction.__exit__()
|
||||
raise StopIteration
|
||||
return k, v
|
||||
Loading…
Reference in New Issue
Block a user