Move catch-up stats to db.py
This commit is contained in:
parent
891730e78f
commit
dc445e2a54
@ -20,8 +20,8 @@ from aiorpcx import TaskGroup, run_in_thread
|
||||
import electrumx
|
||||
from electrumx.server.daemon import DaemonError
|
||||
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
|
||||
from electrumx.lib.util import chunks, formatted_time, class_logger
|
||||
from electrumx.server.db import DB, FlushData
|
||||
from electrumx.lib.util import chunks, class_logger
|
||||
from electrumx.server.db import FlushData
|
||||
|
||||
|
||||
class Prefetcher(object):
|
||||
@ -312,6 +312,8 @@ class BlockProcessor(DB):
|
||||
|
||||
return start, count
|
||||
|
||||
# - Flushing
|
||||
|
||||
def assert_flushed(self):
|
||||
'''Asserts state is fully flushed.'''
|
||||
assert not self.undo_infos
|
||||
@ -319,6 +321,11 @@ class BlockProcessor(DB):
|
||||
assert not self.db_deletes
|
||||
self.db_assert_flushed(self.tx_count, self.height)
|
||||
|
||||
def flush_data(self):
|
||||
return FlushData(self.height, self.tx_count, self.headers,
|
||||
self.tx_hashes, self.undo_infos, self.utxo_cache,
|
||||
self.db_deletes, self.tip)
|
||||
|
||||
async def flush_for_backup(self):
|
||||
# self.touched can include other addresses which is
|
||||
# harmless, but remove None.
|
||||
@ -335,17 +342,18 @@ class BlockProcessor(DB):
|
||||
else:
|
||||
await self.run_in_thread_with_lock(self._flush_body, flush_utxos)
|
||||
|
||||
def flush_data(self):
|
||||
return FlushData(self.height, self.tx_count, self.headers,
|
||||
self.tx_hashes, self.undo_infos, self.utxo_cache,
|
||||
self.db_deletes, self.tip)
|
||||
|
||||
def _flush_body(self, flush_utxos):
|
||||
'''Flush out cached state. UTXOs are flushed if flush_utxos.'''
|
||||
last_flush = self.last_flush
|
||||
tx_diff = self.tx_count - self.last_flush_tx_count
|
||||
# Try to estimate how many txs there are to go
|
||||
daemon_height = self.daemon.cached_height()
|
||||
coin = self.coin
|
||||
tail_count = daemon_height - max(self.height, coin.TX_COUNT_HEIGHT)
|
||||
# Damp the initial enthusiasm
|
||||
factor = max(2.0 - 0.9 * self.height / coin.TX_COUNT_HEIGHT, 1.0)
|
||||
estimated_txs = (tail_count * coin.TX_PER_BLOCK +
|
||||
max(coin.TX_COUNT - self.tx_count, 0)) * factor
|
||||
|
||||
self.flush_dbs(self.flush_data(), flush_utxos)
|
||||
self.flush_dbs(self.flush_data(), flush_utxos, estimated_txs)
|
||||
self.tx_hashes = []
|
||||
self.headers = []
|
||||
if flush_utxos:
|
||||
@ -353,30 +361,6 @@ class BlockProcessor(DB):
|
||||
self.utxo_cache = {}
|
||||
self.undo_infos = []
|
||||
|
||||
# Catch-up stats
|
||||
if self.utxo_db.for_sync:
|
||||
tx_per_sec = int(self.tx_count / self.wall_time)
|
||||
this_tx_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush))
|
||||
self.logger.info('tx/sec since genesis: {:,d}, '
|
||||
'since last flush: {:,d}'
|
||||
.format(tx_per_sec, this_tx_per_sec))
|
||||
|
||||
daemon_height = self.daemon.cached_height()
|
||||
if self.height > self.coin.TX_COUNT_HEIGHT:
|
||||
tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK
|
||||
else:
|
||||
tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT)
|
||||
* self.coin.TX_PER_BLOCK
|
||||
+ (self.coin.TX_COUNT - self.tx_count))
|
||||
|
||||
# Damp the enthusiasm
|
||||
realism = 2.0 - 0.9 * self.height / self.coin.TX_COUNT_HEIGHT
|
||||
tx_est *= max(realism, 1.0)
|
||||
|
||||
self.logger.info('sync time: {} ETA: {}'
|
||||
.format(formatted_time(self.wall_time),
|
||||
formatted_time(tx_est / this_tx_per_sec)))
|
||||
|
||||
def check_cache_size(self):
|
||||
'''Flush a cache if it gets too big.'''
|
||||
# Good average estimates based on traversal of subobjects and
|
||||
|
||||
@ -24,6 +24,7 @@ from aiorpcx import run_in_thread
|
||||
import electrumx.lib.util as util
|
||||
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
|
||||
from electrumx.lib.merkle import Merkle, MerkleCache
|
||||
from electrumx.lib.util import formatted_time
|
||||
from electrumx.server.storage import db_class
|
||||
from electrumx.server.history import History
|
||||
|
||||
@ -167,10 +168,11 @@ class DB(object):
|
||||
return await self.header_mc.branch_and_root(length, height)
|
||||
|
||||
# Flushing
|
||||
def flush_dbs(self, flush_data, flush_utxos):
|
||||
def flush_dbs(self, flush_data, flush_utxos, estimated_txs):
|
||||
'''Flush out cached state. History is always flushed; UTXOs are
|
||||
flushed if flush_utxos.'''
|
||||
start_time = time.time()
|
||||
prior_flush = self.last_flush
|
||||
tx_delta = flush_data.tx_count - self.last_flush_tx_count
|
||||
|
||||
# Flush to file system
|
||||
@ -194,6 +196,17 @@ class DB(object):
|
||||
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
|
||||
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
|
||||
|
||||
# Catch-up stats
|
||||
if self.utxo_db.for_sync:
|
||||
flush_interval = self.last_flush - prior_flush
|
||||
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
|
||||
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
|
||||
eta = estimated_txs / tx_per_sec_last
|
||||
self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, '
|
||||
f'since last flush: {tx_per_sec_last:,d}')
|
||||
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
|
||||
f'ETA: {formatted_time(eta)}')
|
||||
|
||||
def flush_fs(self, flush_data):
|
||||
'''Write headers, tx counts and block tx hashes to the filesystem.
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user