From 27b31746f8a11afe36577af8eb1b2feeae95eead Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 14:13:09 +0900 Subject: [PATCH] Remove remaining flush-related logic to db.py --- electrumx/server/block_processor.py | 59 +++++++++++------------------ electrumx/server/db.py | 26 +++++++++---- 2 files changed, 41 insertions(+), 44 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index c2ad2aa..2669526 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -187,17 +187,6 @@ class BlockProcessor(DB): return await run_in_thread(func, *args) return await asyncio.shield(run_in_thread_locked()) - async def _maybe_flush(self): - # If caught up, flush everything as client queries are - # performed on the DB. - if self._caught_up_event.is_set(): - await self.flush(True) - elif time.time() > self.next_cache_check: - flush_arg = self.check_cache_size() - if flush_arg is not None: - await self.flush(flush_arg) - self.next_cache_check = time.time() + 30 - async def check_and_advance_blocks(self, raw_blocks): '''Process the list of raw blocks passed. Detects and handles reorgs. @@ -316,38 +305,36 @@ class BlockProcessor(DB): return start, count + def estimate_txs_remaining(self): + # Try to estimate how many txs there are to go + daemon_height = self.daemon.cached_height() + coin = self.coin + tail_count = daemon_height - max(self.height, coin.TX_COUNT_HEIGHT) + # Damp the initial enthusiasm + realism = max(2.0 - 0.9 * self.height / coin.TX_COUNT_HEIGHT, 1.0) + return (tail_count * coin.TX_PER_BLOCK + + max(coin.TX_COUNT - self.tx_count, 0)) * realism + # - Flushing - - def assert_flushed(self): - '''Asserts state is fully flushed.''' - assert not self.undo_infos - assert not self.utxo_cache - assert not self.db_deletes - self.db_assert_flushed(self.tx_count, self.height) - def flush_data(self): return FlushData(self.height, self.tx_count, self.headers, self.tx_hashes, self.undo_infos, self.utxo_cache, self.db_deletes, self.tip) async def flush(self, flush_utxos): - if self.height == self.db_height: - self.assert_flushed() - else: - await self.run_in_thread_with_lock(self._flush_body, flush_utxos) + await self.run_in_thread_with_lock( + self.flush_dbs, self.flush_data(), flush_utxos) - def _flush_body(self, flush_utxos): - '''Flush out cached state. UTXOs are flushed if flush_utxos.''' - # Try to estimate how many txs there are to go - daemon_height = self.daemon.cached_height() - coin = self.coin - tail_count = daemon_height - max(self.height, coin.TX_COUNT_HEIGHT) - # Damp the initial enthusiasm - factor = max(2.0 - 0.9 * self.height / coin.TX_COUNT_HEIGHT, 1.0) - estimated_txs = (tail_count * coin.TX_PER_BLOCK + - max(coin.TX_COUNT - self.tx_count, 0)) * factor - - self.flush_dbs(self.flush_data(), flush_utxos, estimated_txs) + async def _maybe_flush(self): + # If caught up, flush everything as client queries are + # performed on the DB. + if self._caught_up_event.is_set(): + await self.flush(True) + elif time.time() > self.next_cache_check: + flush_arg = self.check_cache_size() + if flush_arg is not None: + await self.flush(flush_arg) + self.next_cache_check = time.time() + 30 def check_cache_size(self): '''Flush a cache if it gets too big.''' @@ -448,7 +435,7 @@ class BlockProcessor(DB): The blocks should be in order of decreasing height, starting at. self.height. A flush is performed once the blocks are backed up. ''' - self.assert_flushed() + self.assert_flushed(self.flush_data()) assert self.height >= len(raw_blocks) coin = self.coin diff --git a/electrumx/server/db.py b/electrumx/server/db.py index 3b7019d..ab57539 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -168,9 +168,25 @@ class DB(object): return await self.header_mc.branch_and_root(length, height) # Flushing - def flush_dbs(self, flush_data, flush_utxos, estimated_txs): + def assert_flushed(self, flush_data): + '''Asserts state is fully flushed.''' + assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count + assert flush_data.height == self.fs_height == self.db_height + assert flush_data.tip == self.tip + assert not flush_data.headers + assert not flush_data.block_tx_hashes + assert not flush_data.adds + assert not flush_data.deletes + assert not flush_data.undo_infos + self.history.assert_flushed() + + def flush_dbs(self, flush_data, flush_utxos): '''Flush out cached state. History is always flushed; UTXOs are flushed if flush_utxos.''' + if flush_data.height == self.db_height: + self.assert_flushed(flush_data) + return + start_time = time.time() prior_flush = self.last_flush tx_delta = flush_data.tx_count - self.last_flush_tx_count @@ -201,7 +217,7 @@ class DB(object): flush_interval = self.last_flush - prior_flush tx_per_sec_gen = int(flush_data.tx_count / self.wall_time) tx_per_sec_last = 1 + int(tx_delta / flush_interval) - eta = estimated_txs / tx_per_sec_last + eta = self.estimate_txs_remaining() / tx_per_sec_last self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, ' f'since last flush: {tx_per_sec_last:,d}') self.logger.info(f'sync time: {formatted_time(self.wall_time)} ' @@ -323,12 +339,6 @@ class DB(object): f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') - def db_assert_flushed(self, to_tx_count, to_height): - '''Asserts state is fully flushed.''' - assert to_tx_count == self.fs_tx_count == self.db_tx_count - assert to_height == self.fs_height == self.db_height - self.history.assert_flushed() - def fs_update_header_offsets(self, offset_start, height_start, headers): if self.coin.STATIC_BLOCK_HEADERS: return