Move flush_backup() to db.py
This commit is contained in:
parent
42c3a308db
commit
891730e78f
@ -261,7 +261,7 @@ class BlockProcessor(DB):
|
||||
for hex_hashes in chunks(hashes, 50):
|
||||
raw_blocks = await get_raw_blocks(last, hex_hashes)
|
||||
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
|
||||
await self.backup_flush()
|
||||
await self.flush_for_backup()
|
||||
last -= len(raw_blocks)
|
||||
await self.prefetcher.reset_height(self.height)
|
||||
|
||||
@ -319,6 +319,16 @@ class BlockProcessor(DB):
|
||||
assert not self.db_deletes
|
||||
self.db_assert_flushed(self.tx_count, self.height)
|
||||
|
||||
async def flush_for_backup(self):
|
||||
# self.touched can include other addresses which is
|
||||
# harmless, but remove None.
|
||||
self.touched.discard(None)
|
||||
await self.run_in_thread_with_lock(
|
||||
self.flush_backup, self.flush_data(), self.touched)
|
||||
self.db_deletes = []
|
||||
self.utxo_cache = {}
|
||||
self.undo_infos = []
|
||||
|
||||
async def flush(self, flush_utxos):
|
||||
if self.height == self.db_height:
|
||||
self.assert_flushed()
|
||||
@ -367,46 +377,6 @@ class BlockProcessor(DB):
|
||||
.format(formatted_time(self.wall_time),
|
||||
formatted_time(tx_est / this_tx_per_sec)))
|
||||
|
||||
async def backup_flush(self):
|
||||
assert self.height < self.db_height
|
||||
assert not self.headers
|
||||
assert not self.tx_hashes
|
||||
self.history.assert_flushed()
|
||||
await self.run_in_thread_with_lock(self._backup_flush_body)
|
||||
|
||||
def _backup_flush_body(self):
|
||||
'''Like flush() but when backing up. All UTXOs are flushed.
|
||||
|
||||
hashXs - sequence of hashXs which were touched by backing
|
||||
up. Searched for history entries to remove after the backup
|
||||
height.
|
||||
'''
|
||||
flush_start = time.time()
|
||||
|
||||
self.backup_fs(self.height, self.tx_count)
|
||||
|
||||
# Backup history. self.touched can include other addresses
|
||||
# which is harmless, but remove None.
|
||||
self.touched.discard(None)
|
||||
nremoves = self.history.backup(self.touched, self.tx_count)
|
||||
self.logger.info('backing up removed {:,d} history entries'
|
||||
.format(nremoves))
|
||||
|
||||
with self.utxo_db.write_batch() as batch:
|
||||
self.flush_utxo_db(batch, self.flush_data())
|
||||
# Flush state last as it reads the wall time.
|
||||
self.flush_state(batch)
|
||||
|
||||
self.db_deletes = []
|
||||
self.utxo_cache = {}
|
||||
self.undo_infos = []
|
||||
|
||||
self.logger.info('backup flush #{:,d} took {:.1f}s. '
|
||||
'Height {:,d} txs: {:,d}'
|
||||
.format(self.history.flush_count,
|
||||
self.last_flush - flush_start,
|
||||
self.height, self.tx_count))
|
||||
|
||||
def check_cache_size(self):
|
||||
'''Flush a cache if it gets too big.'''
|
||||
# Good average estimates based on traversal of subobjects and
|
||||
|
||||
@ -282,6 +282,28 @@ class DB(object):
|
||||
self.last_flush_tx_count = self.fs_tx_count
|
||||
self.write_utxo_state(batch)
|
||||
|
||||
def flush_backup(self, flush_data, touched):
|
||||
'''Like flush_dbs() but when backing up. All UTXOs are flushed.'''
|
||||
assert not flush_data.headers
|
||||
assert not flush_data.block_tx_hashes
|
||||
assert flush_data.height < self.db_height
|
||||
self.history.assert_flushed()
|
||||
|
||||
start_time = time.time()
|
||||
tx_delta = flush_data.tx_count - self.last_flush_tx_count
|
||||
|
||||
self.backup_fs(flush_data.height, flush_data.tx_count)
|
||||
self.history.backup(touched, flush_data.tx_count)
|
||||
with self.utxo_db.write_batch() as batch:
|
||||
self.flush_utxo_db(batch, flush_data)
|
||||
# Flush state last as it reads the wall time.
|
||||
self.flush_state(batch)
|
||||
|
||||
elapsed = self.last_flush - start_time
|
||||
self.logger.info(f'backup flush #{self.history.flush_count:,d} took '
|
||||
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
|
||||
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
|
||||
|
||||
def db_assert_flushed(self, to_tx_count, to_height):
|
||||
'''Asserts state is fully flushed.'''
|
||||
assert to_tx_count == self.fs_tx_count == self.db_tx_count
|
||||
|
||||
@ -85,7 +85,7 @@ class History(object):
|
||||
if flush_id > utxo_flush_count:
|
||||
keys.append(key)
|
||||
|
||||
self.logger.info('deleting {:,d} history entries'.format(len(keys)))
|
||||
self.logger.info(f'deleting {len(keys):,d} history entries')
|
||||
|
||||
self.flush_count = utxo_flush_count
|
||||
with self.db.write_batch() as batch:
|
||||
@ -144,7 +144,6 @@ class History(object):
|
||||
self.logger.info(f'flushed history in {elapsed:.1f}s '
|
||||
f'for {count:,d} addrs')
|
||||
|
||||
|
||||
def backup(self, hashXs, tx_count):
|
||||
# Not certain this is needed, but it doesn't hurt
|
||||
self.flush_count += 1
|
||||
@ -172,7 +171,7 @@ class History(object):
|
||||
batch.put(key, value)
|
||||
self.write_state(batch)
|
||||
|
||||
return nremoves
|
||||
self.logger.info(f'backing up removed {nremoves:,d} history entries')
|
||||
|
||||
def get_txnums(self, hashX, limit=1000):
|
||||
'''Generator that returns an unpruned, sorted list of tx_nums in the
|
||||
|
||||
Loading…
Reference in New Issue
Block a user