From c0a112f8ea3a9d5bc790fdc149f90b91b62dcfe3 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 8 Nov 2016 07:32:55 +0900 Subject: [PATCH] Split out part of block processor into db.py The part that doesn't actually do any block processing... --- server/block_processor.py | 124 ++------------------------------- server/cache.py | 8 +-- server/db.py | 142 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 151 insertions(+), 123 deletions(-) create mode 100644 server/db.py diff --git a/server/block_processor.py b/server/block_processor.py index 6116924..1412cbc 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -22,6 +22,7 @@ from server.daemon import DaemonError from lib.hash import hash_to_str from lib.tx import Deserializer from lib.util import chunks, LoggedClass +import server.db from server.storage import open_db @@ -33,9 +34,6 @@ def formatted_time(t): t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60) -UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") - - class ChainError(Exception): pass @@ -283,7 +281,7 @@ class MemPool(LoggedClass): return value -class BlockProcessor(LoggedClass): +class BlockProcessor(server.db.DB): '''Process blocks and update the DB state to match. Employ a prefetcher to prefetch blocks in batches for processing. @@ -292,9 +290,8 @@ class BlockProcessor(LoggedClass): def __init__(self, env, daemon, on_update=None): '''on_update is awaitable, and called only when caught up with the - daemon and a new block arrives or the mempool is updated. - ''' - super().__init__() + daemon and a new block arrives or the mempool is updated.''' + super().__init__(env.coin, env.db_engine) self.daemon = daemon self.on_update = on_update @@ -305,39 +302,16 @@ class BlockProcessor(LoggedClass): self.utxo_MB = env.utxo_MB self.hist_MB = env.hist_MB self.next_cache_check = 0 - self.coin = env.coin self.reorg_limit = env.reorg_limit - # Open DB and metadata files. Record some of its state. - db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) - self.db = open_db(db_name, env.db_engine) - if self.db.is_new: - self.logger.info('created new {} database {}' - .format(env.db_engine, db_name)) - else: - self.logger.info('successfully opened {} database {}' - .format(env.db_engine, db_name)) - - self.init_state() - self.tx_count = self.db_tx_count - self.height = self.db_height - self.tip = self.db_tip - - # Caches to be flushed later. Headers and tx_hashes have one - # entry per block + # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - self.utxo_cache = UTXOCache(self, self.db, self.coin) - self.fs_cache = FSCache(self.coin, self.height, self.tx_count) self.prefetcher = Prefetcher(daemon, self.height) self.last_flush = time.time() self.last_flush_tx_count = self.tx_count - # Redirected member funcs - self.get_tx_hash = self.fs_cache.get_tx_hash - self.read_headers = self.fs_cache.read_headers - # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' 'flush count: {:,d} utxo flush count: {:,d} ' @@ -451,30 +425,6 @@ class BlockProcessor(LoggedClass): return self.fs_cache.block_hashes(start, count) - def init_state(self): - if self.db.is_new: - self.db_height = -1 - self.db_tx_count = 0 - self.db_tip = b'\0' * 32 - self.flush_count = 0 - self.utxo_flush_count = 0 - self.wall_time = 0 - self.first_sync = True - else: - state = self.db.get(b'state') - state = ast.literal_eval(state.decode()) - if state['genesis'] != self.coin.GENESIS_HASH: - raise ChainError('DB genesis hash {} does not match coin {}' - .format(state['genesis_hash'], - self.coin.GENESIS_HASH)) - self.db_height = state['height'] - self.db_tx_count = state['tx_count'] - self.db_tip = state['tip'] - self.flush_count = state['flush_count'] - self.utxo_flush_count = state['utxo_flush_count'] - self.wall_time = state['wall_time'] - self.first_sync = state.get('first_sync', True) - def clean_db(self): '''Clean out stale DB items. @@ -839,13 +789,6 @@ class BlockProcessor(LoggedClass): assert n == 0 self.tx_count -= len(txs) - @staticmethod - def resolve_limit(limit): - if limit is None: - return -1 - assert isinstance(limit, int) and limit >= 0 - return limit - def mempool_transactions(self, hash168): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool entries for the hash168. @@ -860,60 +803,3 @@ class BlockProcessor(LoggedClass): Can be positive or negative. ''' return self.mempool.value(hash168) - - def get_history(self, hash168, limit=1000): - '''Generator that returns an unpruned, sorted list of (tx_hash, - height) tuples of confirmed transactions that touched the address, - earliest in the blockchain first. Includes both spending and - receiving transactions. By default yields at most 1000 entries. - Set limit to None to get them all. - ''' - limit = self.resolve_limit(limit) - prefix = b'H' + hash168 - for key, hist in self.db.iterator(prefix=prefix): - a = array.array('I') - a.frombytes(hist) - for tx_num in a: - if limit == 0: - return - yield self.get_tx_hash(tx_num) - limit -= 1 - - def get_balance(self, hash168): - '''Returns the confirmed balance of an address.''' - return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None)) - - def get_utxos(self, hash168, limit=1000): - '''Generator that yields all UTXOs for an address sorted in no - particular order. By default yields at most 1000 entries. - Set limit to None to get them all. - ''' - limit = self.resolve_limit(limit) - unpack = struct.unpack - prefix = b'u' + hash168 - for k, v in self.db.iterator(prefix=prefix): - (tx_pos,) = unpack('= 0 + return limit + + def get_history(self, hash168, limit=1000): + '''Generator that returns an unpruned, sorted list of (tx_hash, + height) tuples of confirmed transactions that touched the address, + earliest in the blockchain first. Includes both spending and + receiving transactions. By default yields at most 1000 entries. + Set limit to None to get them all. + ''' + limit = self._resolve_limit(limit) + prefix = b'H' + hash168 + for key, hist in self.db.iterator(prefix=prefix): + a = array.array('I') + a.frombytes(hist) + for tx_num in a: + if limit == 0: + return + yield self.get_tx_hash(tx_num) + limit -= 1 + + def get_balance(self, hash168): + '''Returns the confirmed balance of an address.''' + return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None)) + + def get_utxos(self, hash168, limit=1000): + '''Generator that yields all UTXOs for an address sorted in no + particular order. By default yields at most 1000 entries. + Set limit to None to get them all. + ''' + limit = self._resolve_limit(limit) + unpack = struct.unpack + prefix = b'u' + hash168 + for k, v in self.db.iterator(prefix=prefix): + (tx_pos,) = unpack('