From 967b2de60d08f720323c969ff2d78b1d8ffd88ad Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 14:42:17 +0900 Subject: [PATCH] Separate the block processor from the DB - BP no longer inherits from the DB, but is passed it --- electrumx/server/block_processor.py | 61 +++++++++++++++-------------- electrumx/server/chain_state.py | 20 +++++----- electrumx/server/controller.py | 10 +++-- electrumx/server/db.py | 8 ++-- 4 files changed, 52 insertions(+), 47 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 2669526..6c6301f 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -142,21 +142,23 @@ class ChainError(Exception): '''Raised on error processing blocks.''' -class BlockProcessor(DB): +class BlockProcessor(object): '''Process blocks and update the DB state to match. Employ a prefetcher to prefetch blocks in batches for processing. Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, daemon, notifications): - super().__init__(env) - + def __init__(self, env, db, daemon, notifications): + self.env = env + self.db = db self.daemon = daemon self.notifications = notifications + self.coin = env.coin self.blocks_event = asyncio.Event() self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) + self.logger = class_logger(__name__, self.__class__.__name__) # Meta self.next_cache_check = 0 @@ -204,7 +206,7 @@ class BlockProcessor(DB): start = time.time() await self.run_in_thread_with_lock(self.advance_blocks, blocks) await self._maybe_flush() - if not self.first_sync: + if not self.db.first_sync: s = '' if len(blocks) == 1 else 's' self.logger.info('processed {:,d} block{} in {:.1f}s' .format(len(blocks), s, @@ -254,7 +256,7 @@ class BlockProcessor(DB): # harmless, but remove None. self.touched.discard(None) await self.run_in_thread_with_lock( - self.flush_backup, self.flush_data(), self.touched) + self.db.flush_backup, self.flush_data(), self.touched) last -= len(raw_blocks) await self.prefetcher.reset_height(self.height) @@ -271,7 +273,7 @@ class BlockProcessor(DB): self.logger.info(f'chain was reorganised replacing {count:,d} ' f'block{s} at heights {start:,d}-{last:,d}') - return start, last, await self.fs_block_hashes(start, count) + return start, last, await self.db.fs_block_hashes(start, count) async def calc_reorg_range(self, count): '''Calculate the reorg range''' @@ -289,7 +291,7 @@ class BlockProcessor(DB): start = self.height - 1 count = 1 while start > 0: - hashes = await self.fs_block_hashes(start, count) + hashes = await self.db.fs_block_hashes(start, count) hex_hashes = [hash_to_hex_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) n = diff_pos(hex_hashes, d_hex_hashes) @@ -323,7 +325,8 @@ class BlockProcessor(DB): async def flush(self, flush_utxos): await self.run_in_thread_with_lock( - self.flush_dbs, self.flush_data(), flush_utxos) + self.db.flush_dbs, self.flush_data(), flush_utxos, + self.estimate_txs_remaining) async def _maybe_flush(self): # If caught up, flush everything as client queries are @@ -343,10 +346,10 @@ class BlockProcessor(DB): one_MB = 1000*1000 utxo_cache_size = len(self.utxo_cache) * 205 db_deletes_size = len(self.db_deletes) * 57 - hist_cache_size = self.history.unflushed_memsize() + hist_cache_size = self.db.history.unflushed_memsize() # Roughly ntxs * 32 + nblocks * 42 - tx_hash_size = ((self.tx_count - self.fs_tx_count) * 32 - + (self.height - self.fs_height) * 42) + tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32 + + (self.height - self.db.fs_height) * 42) utxo_MB = (db_deletes_size + utxo_cache_size) // one_MB hist_MB = (hist_cache_size + tx_hash_size) // one_MB @@ -367,7 +370,7 @@ class BlockProcessor(DB): It is already verified they correctly connect onto our tip. ''' - min_height = self.min_undo_height(self.daemon.cached_height()) + min_height = self.db.min_undo_height(self.daemon.cached_height()) height = self.height for block in blocks: @@ -375,7 +378,7 @@ class BlockProcessor(DB): undo_info = self.advance_txs(block.transactions) if height >= min_height: self.undo_infos.append((undo_info, height)) - self.write_raw_block(block.raw, height) + self.db.write_raw_block(block.raw, height) headers = [block.header for block in blocks] self.height = height @@ -422,10 +425,10 @@ class BlockProcessor(DB): update_touched(hashXs) tx_num += 1 - self.history.add_unflushed(hashXs_by_tx, self.tx_count) + self.db.history.add_unflushed(hashXs_by_tx, self.tx_count) self.tx_count = tx_num - self.tx_counts.append(tx_num) + self.db.tx_counts.append(tx_num) return undo_info @@ -435,7 +438,7 @@ class BlockProcessor(DB): The blocks should be in order of decreasing height, starting at. self.height. A flush is performed once the blocks are backed up. ''' - self.assert_flushed(self.flush_data()) + self.db.assert_flushed(self.flush_data()) assert self.height >= len(raw_blocks) coin = self.coin @@ -451,14 +454,14 @@ class BlockProcessor(DB): self.tip = coin.header_prevhash(block.header) self.backup_txs(block.transactions) self.height -= 1 - self.tx_counts.pop() + self.db.tx_counts.pop() self.logger.info('backed up to height {:,d}'.format(self.height)) def backup_txs(self, txs): # Prevout values, in order down the block (coinbase first if present) # undo_info is in reverse block order - undo_info = self.read_undo_info(self.height) + undo_info = self.db.read_undo_info(self.height) if undo_info is None: raise ChainError('no undo information found for height {:,d}' .format(self.height)) @@ -566,14 +569,14 @@ class BlockProcessor(DB): # Value: hashX prefix = b'h' + tx_hash[:4] + idx_packed candidates = {db_key: hashX for db_key, hashX - in self.utxo_db.iterator(prefix=prefix)} + in self.db.utxo_db.iterator(prefix=prefix)} for hdb_key, hashX in candidates.items(): tx_num_packed = hdb_key[-4:] if len(candidates) > 1: tx_num, = unpack('False state. - first_sync = self.first_sync - self.first_sync = False + first_sync = self.db.first_sync + self.db.first_sync = False await self.flush(True) if first_sync: self.logger.info(f'{electrumx.version} synced to ' @@ -619,13 +622,13 @@ class BlockProcessor(DB): # Initialise the notification framework await self.notifications.on_block(set(), self.height) # Reopen for serving - await self.open_for_serving() + await self.db.open_for_serving() async def _first_open_dbs(self): - await self.open_for_sync() - self.height = self.db_height - self.tip = self.db_tip - self.tx_count = self.db_tx_count + await self.db.open_for_sync() + self.height = self.db.db_height + self.tip = self.db.db_tip + self.tx_count = self.db.db_tx_count # --- External API diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index 8e33830..135d42a 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -14,18 +14,18 @@ class ChainState(object): blocks, transaction history, UTXOs and the mempool. ''' - def __init__(self, env, daemon, bp): + def __init__(self, env, db, daemon, bp): self._env = env + self._db = db self._daemon = daemon - self._bp = bp # External interface pass-throughs for session.py - self.force_chain_reorg = self._bp.force_chain_reorg - self.tx_branch_and_root = self._bp.merkle.branch_and_root - self.read_headers = self._bp.read_headers - self.all_utxos = self._bp.all_utxos - self.limited_history = self._bp.limited_history - self.header_branch_and_root = self._bp.header_branch_and_root + self.force_chain_reorg = bp.force_chain_reorg + self.tx_branch_and_root = db.merkle.branch_and_root + self.read_headers = db.read_headers + self.all_utxos = db.all_utxos + self.limited_history = db.limited_history + self.header_branch_and_root = db.header_branch_and_root async def broadcast_transaction(self, raw_tx): return await self._daemon.sendrawtransaction([raw_tx]) @@ -34,7 +34,7 @@ class ChainState(object): return await getattr(self._daemon, method)(*args) def db_height(self): - return self._bp.db_height + return self._db.db_height def get_info(self): '''Chain state info for LocalRPC and logs.''' @@ -57,7 +57,7 @@ class ChainState(object): async def query(self, args, limit): coin = self._env.coin - db = self._bp + db = self._db lines = [] def arg_to_hashX(arg): diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index e3115c6..665d39c 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -13,6 +13,7 @@ import electrumx from electrumx.lib.server_base import ServerBase from electrumx.lib.util import version_string from electrumx.server.chain_state import ChainState +from electrumx.server.db import DB from electrumx.server.mempool import MemPool from electrumx.server.session import SessionManager @@ -93,10 +94,11 @@ class Controller(ServerBase): notifications = Notifications() daemon = env.coin.DAEMON(env) + db = DB(env) BlockProcessor = env.coin.BLOCK_PROCESSOR - bp = BlockProcessor(env, daemon, notifications) - mempool = MemPool(env.coin, daemon, notifications, bp.lookup_utxos) - chain_state = ChainState(env, daemon, bp) + bp = BlockProcessor(env, db, daemon, notifications) + mempool = MemPool(env.coin, daemon, notifications, db.lookup_utxos) + chain_state = ChainState(env, db, daemon, bp) session_mgr = SessionManager(env, chain_state, mempool, notifications, shutdown_event) @@ -108,7 +110,7 @@ class Controller(ServerBase): await group.spawn(session_mgr.serve(serve_externally_event)) await group.spawn(bp.fetch_and_process_blocks(caught_up_event)) await caught_up_event.wait() - await group.spawn(bp.populate_header_merkle_cache()) + await group.spawn(db.populate_header_merkle_cache()) await group.spawn(mempool.keep_synchronized(synchronized_event)) await synchronized_event.wait() serve_externally_event.set() diff --git a/electrumx/server/db.py b/electrumx/server/db.py index ab57539..6e82a52 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -158,7 +158,7 @@ class DB(object): async def populate_header_merkle_cache(self): self.logger.info('populating header merkle cache...') - length = max(1, self.height - self.env.reorg_limit) + length = max(1, self.db_height - self.env.reorg_limit) start = time.time() await self.header_mc.initialize(length) elapsed = time.time() - start @@ -172,7 +172,7 @@ class DB(object): '''Asserts state is fully flushed.''' assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count assert flush_data.height == self.fs_height == self.db_height - assert flush_data.tip == self.tip + assert flush_data.tip == self.db_tip assert not flush_data.headers assert not flush_data.block_tx_hashes assert not flush_data.adds @@ -180,7 +180,7 @@ class DB(object): assert not flush_data.undo_infos self.history.assert_flushed() - def flush_dbs(self, flush_data, flush_utxos): + def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining): '''Flush out cached state. History is always flushed; UTXOs are flushed if flush_utxos.''' if flush_data.height == self.db_height: @@ -217,7 +217,7 @@ class DB(object): flush_interval = self.last_flush - prior_flush tx_per_sec_gen = int(flush_data.tx_count / self.wall_time) tx_per_sec_last = 1 + int(tx_delta / flush_interval) - eta = self.estimate_txs_remaining() / tx_per_sec_last + eta = estimate_txs_remaining() / tx_per_sec_last self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, ' f'since last flush: {tx_per_sec_last:,d}') self.logger.info(f'sync time: {formatted_time(self.wall_time)} '