Remove remaining flush-related logic to db.py

This commit is contained in:
Neil Booth 2018-08-09 14:13:09 +09:00
parent a50d17c5b9
commit 27b31746f8
2 changed files with 41 additions and 44 deletions

View File

@ -187,17 +187,6 @@ class BlockProcessor(DB):
return await run_in_thread(func, *args)
return await asyncio.shield(run_in_thread_locked())
async def _maybe_flush(self):
# If caught up, flush everything as client queries are
# performed on the DB.
if self._caught_up_event.is_set():
await self.flush(True)
elif time.time() > self.next_cache_check:
flush_arg = self.check_cache_size()
if flush_arg is not None:
await self.flush(flush_arg)
self.next_cache_check = time.time() + 30
async def check_and_advance_blocks(self, raw_blocks):
'''Process the list of raw blocks passed. Detects and handles
reorgs.
@ -316,38 +305,36 @@ class BlockProcessor(DB):
return start, count
def estimate_txs_remaining(self):
# Try to estimate how many txs there are to go
daemon_height = self.daemon.cached_height()
coin = self.coin
tail_count = daemon_height - max(self.height, coin.TX_COUNT_HEIGHT)
# Damp the initial enthusiasm
realism = max(2.0 - 0.9 * self.height / coin.TX_COUNT_HEIGHT, 1.0)
return (tail_count * coin.TX_PER_BLOCK +
max(coin.TX_COUNT - self.tx_count, 0)) * realism
# - Flushing
def assert_flushed(self):
'''Asserts state is fully flushed.'''
assert not self.undo_infos
assert not self.utxo_cache
assert not self.db_deletes
self.db_assert_flushed(self.tx_count, self.height)
def flush_data(self):
return FlushData(self.height, self.tx_count, self.headers,
self.tx_hashes, self.undo_infos, self.utxo_cache,
self.db_deletes, self.tip)
async def flush(self, flush_utxos):
if self.height == self.db_height:
self.assert_flushed()
else:
await self.run_in_thread_with_lock(self._flush_body, flush_utxos)
await self.run_in_thread_with_lock(
self.flush_dbs, self.flush_data(), flush_utxos)
def _flush_body(self, flush_utxos):
'''Flush out cached state. UTXOs are flushed if flush_utxos.'''
# Try to estimate how many txs there are to go
daemon_height = self.daemon.cached_height()
coin = self.coin
tail_count = daemon_height - max(self.height, coin.TX_COUNT_HEIGHT)
# Damp the initial enthusiasm
factor = max(2.0 - 0.9 * self.height / coin.TX_COUNT_HEIGHT, 1.0)
estimated_txs = (tail_count * coin.TX_PER_BLOCK +
max(coin.TX_COUNT - self.tx_count, 0)) * factor
self.flush_dbs(self.flush_data(), flush_utxos, estimated_txs)
async def _maybe_flush(self):
# If caught up, flush everything as client queries are
# performed on the DB.
if self._caught_up_event.is_set():
await self.flush(True)
elif time.time() > self.next_cache_check:
flush_arg = self.check_cache_size()
if flush_arg is not None:
await self.flush(flush_arg)
self.next_cache_check = time.time() + 30
def check_cache_size(self):
'''Flush a cache if it gets too big.'''
@ -448,7 +435,7 @@ class BlockProcessor(DB):
The blocks should be in order of decreasing height, starting at.
self.height. A flush is performed once the blocks are backed up.
'''
self.assert_flushed()
self.assert_flushed(self.flush_data())
assert self.height >= len(raw_blocks)
coin = self.coin

View File

@ -168,9 +168,25 @@ class DB(object):
return await self.header_mc.branch_and_root(length, height)
# Flushing
def flush_dbs(self, flush_data, flush_utxos, estimated_txs):
def assert_flushed(self, flush_data):
'''Asserts state is fully flushed.'''
assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count
assert flush_data.height == self.fs_height == self.db_height
assert flush_data.tip == self.tip
assert not flush_data.headers
assert not flush_data.block_tx_hashes
assert not flush_data.adds
assert not flush_data.deletes
assert not flush_data.undo_infos
self.history.assert_flushed()
def flush_dbs(self, flush_data, flush_utxos):
'''Flush out cached state. History is always flushed; UTXOs are
flushed if flush_utxos.'''
if flush_data.height == self.db_height:
self.assert_flushed(flush_data)
return
start_time = time.time()
prior_flush = self.last_flush
tx_delta = flush_data.tx_count - self.last_flush_tx_count
@ -201,7 +217,7 @@ class DB(object):
flush_interval = self.last_flush - prior_flush
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
eta = estimated_txs / tx_per_sec_last
eta = self.estimate_txs_remaining() / tx_per_sec_last
self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, '
f'since last flush: {tx_per_sec_last:,d}')
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
@ -323,12 +339,6 @@ class DB(object):
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
def db_assert_flushed(self, to_tx_count, to_height):
'''Asserts state is fully flushed.'''
assert to_tx_count == self.fs_tx_count == self.db_tx_count
assert to_height == self.fs_height == self.db_height
self.history.assert_flushed()
def fs_update_header_offsets(self, offset_start, height_start, headers):
if self.coin.STATIC_BLOCK_HEADERS:
return