Move history flushing to DB.flush_history()
This commit is contained in:
parent
11c6c919a6
commit
c9631f3438
@ -344,18 +344,13 @@ class BlockProcessor(electrumx.server.db.DB):
|
||||
tx_diff = self.tx_count - self.last_flush_tx_count
|
||||
|
||||
# Flush to file system
|
||||
self.fs_flush(self.height, self.tx_count, self.headers,
|
||||
self.flush_fs(self.height, self.tx_count, self.headers,
|
||||
self.tx_hashes)
|
||||
self.tx_hashes = []
|
||||
self.headers = []
|
||||
|
||||
fs_end = time.time()
|
||||
|
||||
# History next - it's fast and frees memory
|
||||
hashX_count = self.history.flush()
|
||||
if self.utxo_db.for_sync:
|
||||
self.logger.info('flushed history in {:.1f}s for {:,d} addrs'
|
||||
.format(time.time() - fs_end, hashX_count))
|
||||
# Then history
|
||||
self.flush_history()
|
||||
|
||||
# Flush state last as it reads the wall time.
|
||||
with self.utxo_db.write_batch() as batch:
|
||||
|
||||
@ -150,9 +150,8 @@ class DB(object):
|
||||
return await self.header_mc.branch_and_root(length, height)
|
||||
|
||||
# Flushing
|
||||
def fs_flush(self, to_height, to_tx_count, headers, block_tx_hashes):
|
||||
def flush_fs(self, to_height, to_tx_count, headers, block_tx_hashes):
|
||||
'''Write headers, tx counts and block tx hashes to the filesystem.
|
||||
No LevelDB state is updated.
|
||||
|
||||
The first height to write is self.fs_height + 1. The FS
|
||||
metadata is all append-only, so in a crash we just pick up
|
||||
@ -187,6 +186,9 @@ class DB(object):
|
||||
elapsed = time.time() - start_time
|
||||
self.logger.info(f'flushed to FS in {elapsed:.2f}s')
|
||||
|
||||
def flush_history(self):
|
||||
self.history.flush()
|
||||
|
||||
def db_assert_flushed(self, to_tx_count, to_height):
|
||||
'''Asserts state is fully flushed.'''
|
||||
assert to_tx_count == self.fs_tx_count == self.db_tx_count
|
||||
|
||||
@ -11,6 +11,7 @@
|
||||
import array
|
||||
import ast
|
||||
import bisect
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from functools import partial
|
||||
from struct import pack, unpack
|
||||
@ -119,6 +120,7 @@ class History(object):
|
||||
assert not self.unflushed
|
||||
|
||||
def flush(self):
|
||||
start_time = time.time()
|
||||
self.flush_count += 1
|
||||
flush_id = pack('>H', self.flush_count)
|
||||
unflushed = self.unflushed
|
||||
@ -132,7 +134,12 @@ class History(object):
|
||||
count = len(unflushed)
|
||||
unflushed.clear()
|
||||
self.unflushed_count = 0
|
||||
return count
|
||||
|
||||
if self.db.for_sync:
|
||||
elapsed = time.time() - start_time
|
||||
self.logger.info(f'flushed history in {elapsed:.1f}s '
|
||||
f'for {count:,d} addrs')
|
||||
|
||||
|
||||
def backup(self, hashXs, tx_count):
|
||||
# Not certain this is needed, but it doesn't hurt
|
||||
|
||||
Loading…
Reference in New Issue
Block a user