Clear data by reference as it's flushed
This commit is contained in:
parent
dc445e2a54
commit
a50d17c5b9
@ -261,7 +261,11 @@ class BlockProcessor(DB):
|
||||
for hex_hashes in chunks(hashes, 50):
|
||||
raw_blocks = await get_raw_blocks(last, hex_hashes)
|
||||
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
|
||||
await self.flush_for_backup()
|
||||
# self.touched can include other addresses which is
|
||||
# harmless, but remove None.
|
||||
self.touched.discard(None)
|
||||
await self.run_in_thread_with_lock(
|
||||
self.flush_backup, self.flush_data(), self.touched)
|
||||
last -= len(raw_blocks)
|
||||
await self.prefetcher.reset_height(self.height)
|
||||
|
||||
@ -326,16 +330,6 @@ class BlockProcessor(DB):
|
||||
self.tx_hashes, self.undo_infos, self.utxo_cache,
|
||||
self.db_deletes, self.tip)
|
||||
|
||||
async def flush_for_backup(self):
|
||||
# self.touched can include other addresses which is
|
||||
# harmless, but remove None.
|
||||
self.touched.discard(None)
|
||||
await self.run_in_thread_with_lock(
|
||||
self.flush_backup, self.flush_data(), self.touched)
|
||||
self.db_deletes = []
|
||||
self.utxo_cache = {}
|
||||
self.undo_infos = []
|
||||
|
||||
async def flush(self, flush_utxos):
|
||||
if self.height == self.db_height:
|
||||
self.assert_flushed()
|
||||
@ -354,12 +348,6 @@ class BlockProcessor(DB):
|
||||
max(coin.TX_COUNT - self.tx_count, 0)) * factor
|
||||
|
||||
self.flush_dbs(self.flush_data(), flush_utxos, estimated_txs)
|
||||
self.tx_hashes = []
|
||||
self.headers = []
|
||||
if flush_utxos:
|
||||
self.db_deletes = []
|
||||
self.utxo_cache = {}
|
||||
self.undo_infos = []
|
||||
|
||||
def check_cache_size(self):
|
||||
'''Flush a cache if it gets too big.'''
|
||||
|
||||
@ -222,6 +222,7 @@ class DB(object):
|
||||
else 0)
|
||||
assert len(self.tx_counts) == flush_data.height + 1
|
||||
hashes = b''.join(flush_data.block_tx_hashes)
|
||||
flush_data.block_tx_hashes.clear()
|
||||
assert len(hashes) % 32 == 0
|
||||
assert len(hashes) // 32 == flush_data.tx_count - prior_tx_count
|
||||
|
||||
@ -231,6 +232,8 @@ class DB(object):
|
||||
offset = self.header_offset(height_start)
|
||||
self.headers_file.write(offset, b''.join(flush_data.headers))
|
||||
self.fs_update_header_offsets(offset, height_start, flush_data.headers)
|
||||
flush_data.headers.clear()
|
||||
|
||||
offset = height_start * self.tx_counts.itemsize
|
||||
self.tx_counts_file.write(offset,
|
||||
self.tx_counts[height_start:].tobytes())
|
||||
@ -253,11 +256,14 @@ class DB(object):
|
||||
# UTXO state may have keys in common with our write cache or
|
||||
# may be in the DB already.
|
||||
start_time = time.time()
|
||||
add_count = len(flush_data.adds)
|
||||
spend_count = len(flush_data.deletes) // 2
|
||||
|
||||
# Spends
|
||||
batch_delete = batch.delete
|
||||
for key in sorted(flush_data.deletes):
|
||||
batch_delete(key)
|
||||
flush_data.deletes.clear()
|
||||
|
||||
# New UTXOs
|
||||
batch_put = batch.put
|
||||
@ -267,15 +273,15 @@ class DB(object):
|
||||
suffix = key[-2:] + value[-12:-8]
|
||||
batch_put(b'h' + key[:4] + suffix, hashX)
|
||||
batch_put(b'u' + hashX + suffix, value[-8:])
|
||||
flush_data.adds.clear()
|
||||
|
||||
# New undo information
|
||||
self.flush_undo_infos(batch_put, flush_data.undo_infos)
|
||||
flush_data.undo_infos.clear()
|
||||
|
||||
if self.utxo_db.for_sync:
|
||||
block_count = flush_data.height - self.db_height
|
||||
tx_count = flush_data.tx_count - self.db_tx_count
|
||||
add_count = len(flush_data.adds)
|
||||
spend_count = len(flush_data.deletes) // 2
|
||||
elapsed = time.time() - start_time
|
||||
self.logger.info(f'flushed {block_count:,d} blocks with '
|
||||
f'{tx_count:,d} txs, {add_count:,d} UTXO adds, '
|
||||
|
||||
Loading…
Reference in New Issue
Block a user