Move bulk of UTXO flush logic to db.py
This commit is contained in:
parent
9515e1a1e4
commit
d1510b1192
@ -683,46 +683,13 @@ class BlockProcessor(electrumx.server.db.DB):
|
||||
|
||||
def flush_utxos(self, batch):
|
||||
'''Flush the cached DB writes and UTXO set to the batch.'''
|
||||
# Care is needed because the writes generated by flushing the
|
||||
# UTXO state may have keys in common with our write cache or
|
||||
# may be in the DB already.
|
||||
flush_start = time.time()
|
||||
delete_count = len(self.db_deletes) // 2
|
||||
utxo_cache_len = len(self.utxo_cache)
|
||||
|
||||
# Spends
|
||||
batch_delete = batch.delete
|
||||
for key in sorted(self.db_deletes):
|
||||
batch_delete(key)
|
||||
self.flush_utxo_db(batch, self.db_deletes, self.utxo_cache,
|
||||
self.undo_infos, self.height, self.tx_count,
|
||||
self.tip)
|
||||
self.db_deletes = []
|
||||
|
||||
# New UTXOs
|
||||
batch_put = batch.put
|
||||
for cache_key, cache_value in self.utxo_cache.items():
|
||||
# suffix = tx_idx + tx_num
|
||||
hashX = cache_value[:-12]
|
||||
suffix = cache_key[-2:] + cache_value[-12:-8]
|
||||
batch_put(b'h' + cache_key[:4] + suffix, hashX)
|
||||
batch_put(b'u' + hashX + suffix, cache_value[-8:])
|
||||
self.utxo_cache = {}
|
||||
|
||||
# New undo information
|
||||
self.flush_undo_infos(batch_put, self.undo_infos)
|
||||
self.undo_infos = []
|
||||
|
||||
if self.utxo_db.for_sync:
|
||||
self.logger.info('flushed {:,d} blocks with {:,d} txs, {:,d} UTXO '
|
||||
'adds, {:,d} spends in {:.1f}s, committing...'
|
||||
.format(self.height - self.db_height,
|
||||
self.tx_count - self.db_tx_count,
|
||||
utxo_cache_len, delete_count,
|
||||
time.time() - flush_start))
|
||||
|
||||
self.utxo_flush_count = self.history.flush_count
|
||||
self.db_tx_count = self.tx_count
|
||||
self.db_height = self.height
|
||||
self.db_tip = self.tip
|
||||
|
||||
async def _process_prefetched_blocks(self):
|
||||
'''Loop forever processing blocks as they arrive.'''
|
||||
while True:
|
||||
|
||||
@ -193,6 +193,45 @@ class DB(object):
|
||||
def flush_history(self):
|
||||
self.history.flush()
|
||||
|
||||
def flush_utxo_db(self, batch, deletes, adds, undo_infos,
|
||||
to_height, to_tx_count, to_tip):
|
||||
'''Flush the cached DB writes and UTXO set to the batch.'''
|
||||
# Care is needed because the writes generated by flushing the
|
||||
# UTXO state may have keys in common with our write cache or
|
||||
# may be in the DB already.
|
||||
start_time = time.time()
|
||||
|
||||
# Spends
|
||||
batch_delete = batch.delete
|
||||
for key in sorted(deletes):
|
||||
batch_delete(key)
|
||||
|
||||
# New UTXOs
|
||||
batch_put = batch.put
|
||||
for key, value in adds.items():
|
||||
# suffix = tx_idx + tx_num
|
||||
hashX = value[:-12]
|
||||
suffix = key[-2:] + value[-12:-8]
|
||||
batch_put(b'h' + key[:4] + suffix, hashX)
|
||||
batch_put(b'u' + hashX + suffix, value[-8:])
|
||||
|
||||
# New undo information
|
||||
self.flush_undo_infos(batch_put, undo_infos)
|
||||
|
||||
if self.utxo_db.for_sync:
|
||||
block_count = to_height - self.db_height
|
||||
tx_count = to_tx_count - self.db_tx_count
|
||||
elapsed = time.time() - start_time
|
||||
self.logger.info(f'flushed {block_count:,d} blocks with '
|
||||
f'{tx_count:,d} txs, {len(adds):,d} UTXO adds, '
|
||||
f'{len(deletes) // 2:,d} spends in '
|
||||
f'{elapsed:.1f}s, committing...')
|
||||
|
||||
self.utxo_flush_count = self.history.flush_count
|
||||
self.db_height = to_height
|
||||
self.db_tx_count = to_tx_count
|
||||
self.db_tip = to_tip
|
||||
|
||||
def db_assert_flushed(self, to_tx_count, to_height):
|
||||
'''Asserts state is fully flushed.'''
|
||||
assert to_tx_count == self.fs_tx_count == self.db_tx_count
|
||||
|
||||
Loading…
Reference in New Issue
Block a user