From dc445e2a54ee7cbb0f6676677d13825cd6461730 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 13:47:54 +0900 Subject: [PATCH] Move catch-up stats to db.py --- electrumx/server/block_processor.py | 52 ++++++++++------------------- electrumx/server/db.py | 15 ++++++++- 2 files changed, 32 insertions(+), 35 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 457d30b..eb71d17 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -20,8 +20,8 @@ from aiorpcx import TaskGroup, run_in_thread import electrumx from electrumx.server.daemon import DaemonError from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN -from electrumx.lib.util import chunks, formatted_time, class_logger -from electrumx.server.db import DB, FlushData +from electrumx.lib.util import chunks, class_logger +from electrumx.server.db import FlushData class Prefetcher(object): @@ -312,6 +312,8 @@ class BlockProcessor(DB): return start, count + # - Flushing + def assert_flushed(self): '''Asserts state is fully flushed.''' assert not self.undo_infos @@ -319,6 +321,11 @@ class BlockProcessor(DB): assert not self.db_deletes self.db_assert_flushed(self.tx_count, self.height) + def flush_data(self): + return FlushData(self.height, self.tx_count, self.headers, + self.tx_hashes, self.undo_infos, self.utxo_cache, + self.db_deletes, self.tip) + async def flush_for_backup(self): # self.touched can include other addresses which is # harmless, but remove None. @@ -335,17 +342,18 @@ class BlockProcessor(DB): else: await self.run_in_thread_with_lock(self._flush_body, flush_utxos) - def flush_data(self): - return FlushData(self.height, self.tx_count, self.headers, - self.tx_hashes, self.undo_infos, self.utxo_cache, - self.db_deletes, self.tip) - def _flush_body(self, flush_utxos): '''Flush out cached state. UTXOs are flushed if flush_utxos.''' - last_flush = self.last_flush - tx_diff = self.tx_count - self.last_flush_tx_count + # Try to estimate how many txs there are to go + daemon_height = self.daemon.cached_height() + coin = self.coin + tail_count = daemon_height - max(self.height, coin.TX_COUNT_HEIGHT) + # Damp the initial enthusiasm + factor = max(2.0 - 0.9 * self.height / coin.TX_COUNT_HEIGHT, 1.0) + estimated_txs = (tail_count * coin.TX_PER_BLOCK + + max(coin.TX_COUNT - self.tx_count, 0)) * factor - self.flush_dbs(self.flush_data(), flush_utxos) + self.flush_dbs(self.flush_data(), flush_utxos, estimated_txs) self.tx_hashes = [] self.headers = [] if flush_utxos: @@ -353,30 +361,6 @@ class BlockProcessor(DB): self.utxo_cache = {} self.undo_infos = [] - # Catch-up stats - if self.utxo_db.for_sync: - tx_per_sec = int(self.tx_count / self.wall_time) - this_tx_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) - self.logger.info('tx/sec since genesis: {:,d}, ' - 'since last flush: {:,d}' - .format(tx_per_sec, this_tx_per_sec)) - - daemon_height = self.daemon.cached_height() - if self.height > self.coin.TX_COUNT_HEIGHT: - tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK - else: - tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT) - * self.coin.TX_PER_BLOCK - + (self.coin.TX_COUNT - self.tx_count)) - - # Damp the enthusiasm - realism = 2.0 - 0.9 * self.height / self.coin.TX_COUNT_HEIGHT - tx_est *= max(realism, 1.0) - - self.logger.info('sync time: {} ETA: {}' - .format(formatted_time(self.wall_time), - formatted_time(tx_est / this_tx_per_sec))) - def check_cache_size(self): '''Flush a cache if it gets too big.''' # Good average estimates based on traversal of subobjects and diff --git a/electrumx/server/db.py b/electrumx/server/db.py index 861845c..d64183d 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -24,6 +24,7 @@ from aiorpcx import run_in_thread import electrumx.lib.util as util from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN from electrumx.lib.merkle import Merkle, MerkleCache +from electrumx.lib.util import formatted_time from electrumx.server.storage import db_class from electrumx.server.history import History @@ -167,10 +168,11 @@ class DB(object): return await self.header_mc.branch_and_root(length, height) # Flushing - def flush_dbs(self, flush_data, flush_utxos): + def flush_dbs(self, flush_data, flush_utxos, estimated_txs): '''Flush out cached state. History is always flushed; UTXOs are flushed if flush_utxos.''' start_time = time.time() + prior_flush = self.last_flush tx_delta = flush_data.tx_count - self.last_flush_tx_count # Flush to file system @@ -194,6 +196,17 @@ class DB(object): f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') + # Catch-up stats + if self.utxo_db.for_sync: + flush_interval = self.last_flush - prior_flush + tx_per_sec_gen = int(flush_data.tx_count / self.wall_time) + tx_per_sec_last = 1 + int(tx_delta / flush_interval) + eta = estimated_txs / tx_per_sec_last + self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, ' + f'since last flush: {tx_per_sec_last:,d}') + self.logger.info(f'sync time: {formatted_time(self.wall_time)} ' + f'ETA: {formatted_time(eta)}') + def flush_fs(self, flush_data): '''Write headers, tx counts and block tx hashes to the filesystem.