Open DB differently depending on if syncing
If syncing, use a high max_open_files, otherwise lower it.
This commit is contained in:
parent
0afddb7bc3
commit
1b95bcd8ac
@ -229,9 +229,12 @@ class BlockProcessor(server.db.DB):
|
||||
self.caught_up = True
|
||||
if self.first_sync:
|
||||
self.first_sync = False
|
||||
self.logger.info('{} synced to height {:,d}. DB version:'
|
||||
.format(VERSION, self.height, self.db_version))
|
||||
self.flush(True)
|
||||
self.logger.info('{} synced to height {:,d}'
|
||||
.format(VERSION, self.height))
|
||||
self.flush(True)
|
||||
self.reopen_db(False)
|
||||
else:
|
||||
self.flush(True)
|
||||
self.event.set()
|
||||
|
||||
async def handle_chain_reorg(self, count, touched):
|
||||
|
||||
59
server/db.py
59
server/db.py
@ -50,16 +50,8 @@ class DB(LoggedClass):
|
||||
self.logger.info('reorg limit is {:,d} blocks'
|
||||
.format(self.env.reorg_limit))
|
||||
|
||||
# Open DB and metadata files. Record some of its state.
|
||||
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
|
||||
self.db = open_db(db_name, env.db_engine)
|
||||
if self.db.is_new:
|
||||
self.logger.info('created new {} database {}'
|
||||
.format(env.db_engine, db_name))
|
||||
else:
|
||||
self.logger.info('successfully opened {} database {}'
|
||||
.format(env.db_engine, db_name))
|
||||
self.read_state()
|
||||
self.db = None
|
||||
self.reopen_db(True)
|
||||
|
||||
create = self.db_height == -1
|
||||
self.headers_file = self.open_file('headers', create)
|
||||
@ -77,6 +69,43 @@ class DB(LoggedClass):
|
||||
assert self.db_tx_count == 0
|
||||
self.clean_db()
|
||||
|
||||
def reopen_db(self, first_sync):
|
||||
'''Open the database. If the database is already open, it is
|
||||
closed (implicitly via GC) and re-opened.
|
||||
|
||||
Re-open to set the maximum number of open files appropriately.
|
||||
'''
|
||||
if self.db:
|
||||
self.logger.info('closing DB to re-open')
|
||||
self.db.close()
|
||||
|
||||
max_open_files = 1024 if first_sync else 256
|
||||
|
||||
# Open DB and metadata files. Record some of its state.
|
||||
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
|
||||
self.db = open_db(db_name, self.env.db_engine, max_open_files)
|
||||
if self.db.is_new:
|
||||
self.logger.info('created new {} database {}'
|
||||
.format(self.env.db_engine, db_name))
|
||||
else:
|
||||
self.logger.info('successfully opened {} database {} for sync: {}'
|
||||
.format(self.env.db_engine, db_name, first_sync))
|
||||
self.read_state()
|
||||
|
||||
if self.first_sync == first_sync:
|
||||
self.logger.info('software version: {}'.format(VERSION))
|
||||
self.logger.info('DB version: {:d}'.format(self.db_version))
|
||||
self.logger.info('coin: {}'.format(self.coin.NAME))
|
||||
self.logger.info('network: {}'.format(self.coin.NET))
|
||||
self.logger.info('height: {:,d}'.format(self.db_height))
|
||||
self.logger.info('tip: {}'.format(hash_to_str(self.db_tip)))
|
||||
self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
|
||||
if self.first_sync:
|
||||
self.logger.info('sync time so far: {}'
|
||||
.format(formatted_time(self.wall_time)))
|
||||
else:
|
||||
self.reopen_db(self.first_sync)
|
||||
|
||||
def read_state(self):
|
||||
if self.db.is_new:
|
||||
self.db_height = -1
|
||||
@ -110,16 +139,6 @@ class DB(LoggedClass):
|
||||
self.wall_time = state['wall_time']
|
||||
self.first_sync = state['first_sync']
|
||||
|
||||
self.logger.info('software version: {}'.format(VERSION))
|
||||
self.logger.info('DB version: {:d}'.format(self.db_version))
|
||||
self.logger.info('coin: {}'.format(self.coin.NAME))
|
||||
self.logger.info('network: {}'.format(self.coin.NET))
|
||||
self.logger.info('height: {:,d}'.format(self.db_height))
|
||||
self.logger.info('tip: {}'.format(hash_to_str(self.db_tip)))
|
||||
self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
|
||||
if self.first_sync:
|
||||
self.logger.info('sync time so far: {}'
|
||||
.format(formatted_time(self.wall_time)))
|
||||
if self.flush_count < self.utxo_flush_count:
|
||||
raise self.DBError('DB corrupt: flush_count < utxo_flush_count')
|
||||
|
||||
|
||||
@ -16,12 +16,12 @@ from functools import partial
|
||||
from lib.util import subclasses, increment_byte_string
|
||||
|
||||
|
||||
def open_db(name, db_engine):
|
||||
def open_db(name, db_engine, for_sync):
|
||||
'''Returns a database handle.'''
|
||||
for db_class in subclasses(Storage):
|
||||
if db_class.__name__.lower() == db_engine.lower():
|
||||
db_class.import_module()
|
||||
return db_class(name)
|
||||
return db_class(name, for_sync)
|
||||
|
||||
raise RuntimeError('unrecognised DB engine "{}"'.format(db_engine))
|
||||
|
||||
@ -29,9 +29,9 @@ def open_db(name, db_engine):
|
||||
class Storage(object):
|
||||
'''Abstract base class of the DB backend abstraction.'''
|
||||
|
||||
def __init__(self, name):
|
||||
def __init__(self, name, for_sync):
|
||||
self.is_new = not os.path.exists(name)
|
||||
self.open(name, create=self.is_new)
|
||||
self.open(name, create=self.is_new, for_sync=for_sync)
|
||||
|
||||
@classmethod
|
||||
def import_module(cls):
|
||||
@ -42,6 +42,10 @@ class Storage(object):
|
||||
'''Open an existing database or create a new one.'''
|
||||
raise NotImplementedError
|
||||
|
||||
def close(self):
|
||||
'''Close an existing database.'''
|
||||
raise NotImplementedError
|
||||
|
||||
def get(self, key):
|
||||
raise NotImplementedError
|
||||
|
||||
@ -75,9 +79,11 @@ class LevelDB(Storage):
|
||||
import plyvel
|
||||
cls.module = plyvel
|
||||
|
||||
def open(self, name, create):
|
||||
def open(self, name, create, for_sync):
|
||||
mof = 1024 if for_sync else 256
|
||||
self.db = self.module.DB(name, create_if_missing=create,
|
||||
max_open_files=256, compression=None)
|
||||
max_open_files=mof, compression=None)
|
||||
self.close = self.db.close
|
||||
self.get = self.db.get
|
||||
self.put = self.db.put
|
||||
self.iterator = self.db.iterator
|
||||
@ -92,18 +98,25 @@ class RocksDB(Storage):
|
||||
import rocksdb
|
||||
cls.module = rocksdb
|
||||
|
||||
def open(self, name, create):
|
||||
def open(self, name, create, for_sync):
|
||||
mof = 1024 if for_sync else 256
|
||||
compression = "no"
|
||||
compression = getattr(self.module.CompressionType,
|
||||
compression + "_compression")
|
||||
options = self.module.Options(create_if_missing=create,
|
||||
compression=compression,
|
||||
target_file_size_base=33554432,
|
||||
max_open_files=1024)
|
||||
max_open_files=mof)
|
||||
self.db = self.module.DB(name, options)
|
||||
self.get = self.db.get
|
||||
self.put = self.db.put
|
||||
|
||||
def close(self):
|
||||
# PyRocksDB doesn't provide a close method; hopefully this is enough
|
||||
self.db = None
|
||||
import gc
|
||||
gc.collect()
|
||||
|
||||
class WriteBatch(object):
|
||||
def __init__(self, db):
|
||||
self.batch = RocksDB.module.WriteBatch()
|
||||
@ -157,11 +170,15 @@ class LMDB(Storage):
|
||||
import lmdb
|
||||
cls.module = lmdb
|
||||
|
||||
def open(self, name, create):
|
||||
def open(self, name, create, for_sync):
|
||||
# I don't see anything equivalent to max_open_files for for_sync
|
||||
self.env = LMDB.module.Environment('.', subdir=True, create=create,
|
||||
max_dbs=32, map_size=5 * 10 ** 10)
|
||||
self.db = self.env.open_db(create=create)
|
||||
|
||||
def close(self):
|
||||
self.env.close()
|
||||
|
||||
def get(self, key):
|
||||
with self.env.begin(db=self.db) as tx:
|
||||
return tx.get(key)
|
||||
|
||||
@ -21,7 +21,7 @@ for c in subclasses(Storage):
|
||||
def db(tmpdir, request):
|
||||
cwd = os.getcwd()
|
||||
os.chdir(str(tmpdir))
|
||||
db = open_db("db", request.param)
|
||||
db = open_db("db", request.param, False)
|
||||
os.chdir(cwd)
|
||||
yield db
|
||||
# Make sure all the locks and handles are closed
|
||||
@ -66,4 +66,4 @@ def test_iterator_reverse(db):
|
||||
assert list(db.iterator(prefix=b"abc", reverse=True)) == [
|
||||
(b"abc" + str.encode(str(i)), str.encode(str(i))) for
|
||||
i in reversed(range(5))
|
||||
]
|
||||
]
|
||||
|
||||
Loading…
Reference in New Issue
Block a user