From 2001d5c4f4f1bf3d69f05f60fbc08234b97e36f0 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 20 Oct 2016 19:05:39 +0900 Subject: [PATCH] Merge the DB and BlockProcessor classes --- server/block_processor.py | 90 +++++++++++++++++---------------------- server/controller.py | 25 ++++++++--- server/protocol.py | 13 ++---- 3 files changed, 62 insertions(+), 66 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index ea9afd1..7f1575c 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -25,6 +25,10 @@ def formatted_time(t): t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60) +class ChainError(Exception): + pass + + class Prefetcher(LoggedClass): '''Prefetches blocks (in the forward direction only).''' @@ -92,46 +96,10 @@ class BlockProcessor(LoggedClass): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, db, daemon): + def __init__(self, env, daemon): super().__init__() - self.db = db + self.daemon = daemon - self.prefetcher = Prefetcher(daemon, db.height) - - def coros(self): - return [self.start(), self.prefetcher.start()] - - def flush_db(self): - self.db.flush(self.daemon.cached_height(), True) - - async def start(self): - '''Loop forever processing blocks in the appropriate direction.''' - try: - while True: - blocks = await self.prefetcher.get_blocks() - for block in blocks: - self.db.process_block(block, self.daemon.cached_height()) - # Release asynchronous block fetching - await asyncio.sleep(0) - - if self.db.height == self.daemon.cached_height(): - self.logger.info('caught up to height {:d}' - .format(self.db_height)) - self.flush_db() - finally: - self.flush_db() - - -class DB(LoggedClass): - - class Error(Exception): - pass - - class ChainError(Exception): - pass - - def __init__(self, env): - super().__init__() # Meta self.utxo_MB = env.utxo_MB @@ -159,6 +127,7 @@ class DB(LoggedClass): self.history_size = 0 self.utxo_cache = UTXOCache(self, self.db, self.coin) self.fs_cache = FSCache(self.coin, self.height, self.tx_count) + self.prefetcher = Prefetcher(daemon, self.height) # Redirected member func self.get_tx_hash = self.fs_cache.get_tx_hash @@ -176,6 +145,26 @@ class DB(LoggedClass): self.logger.info('flushing history cache at {:,d} MB' .format(self.hist_MB)) + def coros(self): + return [self.start(), self.prefetcher.start()] + + async def start(self): + '''Loop forever processing blocks in the appropriate direction.''' + try: + while True: + blocks = await self.prefetcher.get_blocks() + for block in blocks: + self.process_block(block) + # Release asynchronous block fetching + await asyncio.sleep(0) + + if self.height == self.daemon.cached_height(): + self.logger.info('caught up to height {:d}' + .format(self_height)) + self.flush(True) + finally: + if self.daemon.cached_height() is not None: + self.flush(True) def open_db(self, coin): db_name = '{}-{}'.format(coin.NAME, coin.NET) @@ -198,7 +187,7 @@ class DB(LoggedClass): state = db.get(b'state') state = ast.literal_eval(state.decode()) if state['genesis'] != self.coin.GENESIS_HASH: - raise self.Error('DB genesis hash {} does not match coin {}' + raise ChainError('DB genesis hash {} does not match coin {}' .format(state['genesis_hash'], self.coin.GENESIS_HASH)) self.db_height = state['height'] @@ -215,7 +204,7 @@ class DB(LoggedClass): if diff == 0: return if diff < 0: - raise self.Error('DB corrupt: flush_count < utxo_flush_count') + raise ChainError('DB corrupt: flush_count < utxo_flush_count') self.logger.info('DB not shut down cleanly. Scanning for most ' 'recent {:,d} history flushes'.format(diff)) @@ -260,7 +249,7 @@ class DB(LoggedClass): self.db_tx_count = self.tx_count self.db_height = self.height - def flush(self, daemon_height, flush_utxos=False): + def flush(self, flush_utxos=False): '''Flush out cached state. History is always flushed. UTXOs are flushed if flush_utxos.''' @@ -291,6 +280,7 @@ class DB(LoggedClass): .format(self.flush_count, self.height, flush_time)) # Log handy stats + daemon_height = self.daemon.cached_height() txs_per_sec = int(self.tx_count / self.wall_time) this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) if self.height > self.coin.TX_COUNT_HEIGHT: @@ -325,7 +315,7 @@ class DB(LoggedClass): self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - def cache_sizes(self, daemon_height): + def cache_sizes(self): '''Returns the approximate size of the cache, in MB.''' # Good average estimates based on traversal of subobjects and # requesting size from Python (see deep_getsizeof). For @@ -339,7 +329,7 @@ class DB(LoggedClass): hist_MB = hist_cache_size // one_MB self.logger.info('cache stats at height {:,d} daemon height: {:,d}' - .format(self.height, daemon_height)) + .format(self.height, self.daemon.cached_height())) self.logger.info(' entries: UTXO: {:,d} DB: {:,d} ' 'hist addrs: {:,d} hist size: {:,d}' .format(len(self.utxo_cache.cache), @@ -350,16 +340,16 @@ class DB(LoggedClass): .format(utxo_MB + hist_MB, utxo_MB, hist_MB)) return utxo_MB, hist_MB - def process_block(self, block, daemon_height): + def process_block(self, block): # We must update the fs_cache before calling process_tx() as # it uses the fs_cache for tx hash lookup header, tx_hashes, txs = self.fs_cache.process_block(block) prev_hash, header_hash = self.coin.header_hashes(header) if prev_hash != self.tip: - raise self.ChainError('trying to build header with prev_hash {} ' - 'on top of tip with hash {}' - .format(hash_to_str(prev_hash), - hash_to_str(self.tip))) + raise ChainError('trying to build header with prev_hash {} ' + 'on top of tip with hash {}' + .format(hash_to_str(prev_hash), + hash_to_str(self.tip))) self.tip = header_hash self.height += 1 @@ -370,9 +360,9 @@ class DB(LoggedClass): now = time.time() if now > self.next_cache_check: self.next_cache_check = now + 60 - utxo_MB, hist_MB = self.cache_sizes(daemon_height) + utxo_MB, hist_MB = self.cache_sizes() if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB: - self.flush(daemon_height, utxo_MB >= self.utxo_MB) + self.flush(utxo_MB >= self.utxo_MB) def process_tx(self, tx_hash, tx): cache = self.utxo_cache diff --git a/server/controller.py b/server/controller.py index 7eb3d4c..d2e0451 100644 --- a/server/controller.py +++ b/server/controller.py @@ -7,7 +7,7 @@ import traceback from functools import partial from server.daemon import Daemon, DaemonError -from server.block_processor import BlockProcessor, DB +from server.block_processor import BlockProcessor from server.protocol import ElectrumX, LocalRPC from lib.hash import (sha256, double_sha256, hash_to_str, Base58, hex_str_to_hash) @@ -24,9 +24,8 @@ class Controller(LoggedClass): super().__init__() self.loop = loop self.env = env - self.db = DB(env) self.daemon = Daemon(env.daemon_url) - self.block_processor = BlockProcessor(self.db, self.daemon) + self.block_processor = BlockProcessor(env, self.daemon) self.servers = [] self.sessions = set() self.addresses = {} @@ -112,10 +111,9 @@ class Controller(LoggedClass): '''Returns status as 32 bytes.''' status = self.addresses.get(hash168) if status is None: - status = ''.join( - '{}:{:d}:'.format(hash_to_str(tx_hash), height) - for tx_hash, height in self.db.get_history(hash168) - ) + history = self.block_processor.get_history(hash168) + status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) + for tx_hash, height in history) if status: status = sha256(status.encode()) self.addresses[hash168] = status @@ -148,3 +146,16 @@ class Controller(LoggedClass): '''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one per peer.''' return self.peers + + def height(self): + return self.block_processor.height + + def get_current_header(self): + return self.block_processor.get_current_header() + + def get_history(self, hash168): + history = self.block_processor.get_history(hash168, limit=None) + return [ + {'tx_hash': hash_to_str(tx_hash), 'height': height} + for tx_hash, height in history + ] diff --git a/server/protocol.py b/server/protocol.py index a20f08c..47a2561 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -102,9 +102,8 @@ class JSONRPC(asyncio.Protocol, LoggedClass): class ElectrumX(JSONRPC): '''A TCP server that handles incoming Electrum connections.''' - def __init__(self, controller, db, daemon, env): + def __init__(self, controller, daemon, env): super().__init__(controller) - self.db = db self.daemon = daemon self.env = env self.addresses = set() @@ -123,11 +122,7 @@ class ElectrumX(JSONRPC): async def handle_blockchain_address_get_history(self, params): hash168 = self.params_to_hash168(params) - history = [ - {'tx_hash': hash_to_str(tx_hash), 'height': height} - for tx_hash, height in self.db.get_history(hash168, limit=None) - ] - return history + return self.controller.get_history(hash168) async def handle_blockchain_address_subscribe(self, params): hash168 = self.params_to_hash168(params) @@ -140,7 +135,7 @@ class ElectrumX(JSONRPC): async def handle_blockchain_headers_subscribe(self, params): self.subscribe_headers = True - return self.db.get_current_header() + return self.controller.get_current_header() async def handle_blockchain_relayfee(self, params): '''The minimum fee a low-priority tx must pay in order to be accepted @@ -201,7 +196,7 @@ class LocalRPC(JSONRPC): async def handle_getinfo(self, params): return { - 'blocks': self.controller.db.height, + 'blocks': self.controller.height(), 'peers': len(self.controller.get_peers()), 'sessions': len(self.controller.sessions), 'watched': sum(len(s.addresses) for s in self.controller.sessions