Improve flow for opening DBs
This commit is contained in:
parent
aac84ade75
commit
9515e1a1e4
@ -48,7 +48,7 @@ async def compact_history():
|
||||
environ['DAEMON_URL'] = '' # Avoid Env erroring out
|
||||
env = Env()
|
||||
db = DB(env)
|
||||
await db.open_for_sync()
|
||||
await db.open_for_compacting()
|
||||
|
||||
assert not db.first_sync
|
||||
history = db.history
|
||||
|
||||
@ -62,7 +62,7 @@ async def query(args):
|
||||
db = DB(env)
|
||||
coin = env.coin
|
||||
|
||||
await db._open_dbs(False)
|
||||
await db.open_for_serving()
|
||||
|
||||
if not args.scripts:
|
||||
await print_stats(db.hist_db, db.utxo_db)
|
||||
|
||||
@ -159,7 +159,6 @@ class BlockProcessor(electrumx.server.db.DB):
|
||||
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event)
|
||||
|
||||
# Meta
|
||||
self.cache_MB = env.cache_MB
|
||||
self.next_cache_check = 0
|
||||
self.last_flush = time.time()
|
||||
self.touched = set()
|
||||
@ -448,8 +447,9 @@ class BlockProcessor(electrumx.server.db.DB):
|
||||
|
||||
# Flush history if it takes up over 20% of cache memory.
|
||||
# Flush UTXOs once they take up 80% of cache memory.
|
||||
if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5:
|
||||
return utxo_MB >= self.cache_MB * 4 // 5
|
||||
cache_MB = self.env.cache_MB
|
||||
if utxo_MB + hist_MB >= cache_MB or hist_MB >= cache_MB // 5:
|
||||
return utxo_MB >= cache_MB * 4 // 5
|
||||
return None
|
||||
|
||||
def advance_blocks(self, blocks):
|
||||
@ -755,18 +755,10 @@ class BlockProcessor(electrumx.server.db.DB):
|
||||
|
||||
async def _first_open_dbs(self):
|
||||
await self.open_for_sync()
|
||||
# An incomplete compaction needs to be cancelled otherwise
|
||||
# restarting it will corrupt the history
|
||||
self.history.cancel_compaction()
|
||||
# These are our state as we move ahead of DB state
|
||||
self.fs_height = self.db_height
|
||||
self.fs_tx_count = self.db_tx_count
|
||||
self.height = self.db_height
|
||||
self.tip = self.db_tip
|
||||
self.tx_count = self.db_tx_count
|
||||
self.last_flush_tx_count = self.tx_count
|
||||
if self.utxo_db.for_sync:
|
||||
self.logger.info(f'flushing DB cache at {self.cache_MB:,d} MB')
|
||||
|
||||
# --- External API
|
||||
|
||||
|
||||
@ -90,7 +90,7 @@ class DB(object):
|
||||
else:
|
||||
assert self.db_tx_count == 0
|
||||
|
||||
async def _open_dbs(self, for_sync):
|
||||
async def _open_dbs(self, for_sync, compacting):
|
||||
assert self.utxo_db is None
|
||||
|
||||
# First UTXO DB
|
||||
@ -110,12 +110,16 @@ class DB(object):
|
||||
|
||||
# Then history DB
|
||||
self.utxo_flush_count = self.history.open_db(self.db_class, for_sync,
|
||||
self.utxo_flush_count)
|
||||
self.utxo_flush_count,
|
||||
compacting)
|
||||
self.clear_excess_undo_info()
|
||||
|
||||
# Read TX counts (requires meta directory)
|
||||
await self._read_tx_counts()
|
||||
|
||||
async def open_for_compacting(self):
|
||||
await self._open_dbs(True, True)
|
||||
|
||||
async def open_for_sync(self):
|
||||
'''Open the databases to sync to the daemon.
|
||||
|
||||
@ -123,7 +127,7 @@ class DB(object):
|
||||
synchronization. When serving clients we want the open files for
|
||||
serving network connections.
|
||||
'''
|
||||
await self._open_dbs(True)
|
||||
await self._open_dbs(True, False)
|
||||
|
||||
async def open_for_serving(self):
|
||||
'''Open the databases for serving. If they are already open they are
|
||||
@ -134,7 +138,7 @@ class DB(object):
|
||||
self.utxo_db.close()
|
||||
self.history.close_db()
|
||||
self.utxo_db = None
|
||||
await self._open_dbs(False)
|
||||
await self._open_dbs(False, False)
|
||||
|
||||
# Header merkle cache
|
||||
|
||||
@ -395,6 +399,10 @@ class DB(object):
|
||||
self.wall_time = state['wall_time']
|
||||
self.first_sync = state['first_sync']
|
||||
|
||||
# These are our state as we move ahead of DB state
|
||||
self.fs_height = self.db_height
|
||||
self.fs_tx_count = self.db_tx_count
|
||||
|
||||
# Log some stats
|
||||
self.logger.info('DB version: {:d}'.format(self.db_version))
|
||||
self.logger.info('coin: {}'.format(self.coin.NAME))
|
||||
@ -402,6 +410,8 @@ class DB(object):
|
||||
self.logger.info('height: {:,d}'.format(self.db_height))
|
||||
self.logger.info('tip: {}'.format(hash_to_hex_str(self.db_tip)))
|
||||
self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
|
||||
if self.utxo_db.for_sync:
|
||||
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
|
||||
if self.first_sync:
|
||||
self.logger.info('sync time so far: {}'
|
||||
.format(util.formatted_time(self.wall_time)))
|
||||
|
||||
@ -32,10 +32,14 @@ class History(object):
|
||||
self.unflushed_count = 0
|
||||
self.db = None
|
||||
|
||||
def open_db(self, db_class, for_sync, utxo_flush_count):
|
||||
def open_db(self, db_class, for_sync, utxo_flush_count, compacting):
|
||||
self.db = db_class('hist', for_sync)
|
||||
self.read_state()
|
||||
self.clear_excess(utxo_flush_count)
|
||||
# An incomplete compaction needs to be cancelled otherwise
|
||||
# restarting it will corrupt the history
|
||||
if not compacting:
|
||||
self._cancel_compaction()
|
||||
return self.flush_count
|
||||
|
||||
def close_db(self):
|
||||
@ -314,7 +318,7 @@ class History(object):
|
||||
100 * cursor / 65536))
|
||||
return write_size
|
||||
|
||||
def cancel_compaction(self):
|
||||
def _cancel_compaction(self):
|
||||
if self.comp_cursor != -1:
|
||||
self.logger.warning('cancelling in-progress history compaction')
|
||||
self.comp_flush_count = -1
|
||||
|
||||
Loading…
Reference in New Issue
Block a user