diff --git a/server/block_processor.py b/server/block_processor.py index dae96e7..94ca616 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -35,8 +35,8 @@ class Prefetcher(LoggedClass): def __init__(self, daemon, height): super().__init__() self.daemon = daemon + self.semaphore = asyncio.Semaphore() self.queue = asyncio.Queue() - self.queue_semaphore = asyncio.Semaphore() self.queue_size = 0 # Target cache size. Has little effect on sync time. self.target_cache_size = 10 * 1024 * 1024 @@ -49,13 +49,27 @@ class Prefetcher(LoggedClass): self.queue_size -= total_size return blocks + async def clear(self, height): + '''Clear prefetched blocks and restart from the given height. + + Used in blockchain reorganisations. This coroutine can be + called asynchronously to the _prefetch coroutine so we must + synchronize. + ''' + with await self.semaphore: + while not self.queue.empty(): + self.queue.get_nowait() + self.queue_size = 0 + self.fetched_height = height + async def start(self): - '''Loops forever polling for more blocks.''' + '''Loop forever polling for more blocks.''' self.logger.info('prefetching blocks...') while True: while self.queue_size < self.target_cache_size: try: - await self._prefetch() + with await self.semaphore: + await self._prefetch() except DaemonError as e: self.logger.info('ignoring daemon errors: {}'.format(e)) await asyncio.sleep(2) @@ -71,11 +85,11 @@ class Prefetcher(LoggedClass): max_count = min(daemon_height - self.fetched_height, 4000) count = min(max_count, self._prefill_count(self.target_cache_size)) first = self.fetched_height + 1 - hashes = await self.daemon.block_hex_hashes(first, count) - if not hashes: + hex_hashes = await self.daemon.block_hex_hashes(first, count) + if not hex_hashes: return - blocks = await self.daemon.raw_blocks(hashes) + blocks = await self.daemon.raw_blocks(hex_hashes) sizes = [len(block) for block in blocks] total_size = sum(sizes) self.queue.put_nowait((blocks, total_size)) @@ -149,34 +163,83 @@ class BlockProcessor(LoggedClass): return [self.start(), self.prefetcher.start()] async def start(self): - '''Loop forever processing blocks in the appropriate direction.''' - try: - while True: - blocks = await self.prefetcher.get_blocks() - for block in blocks: - self.process_block(block) - # Release asynchronous block fetching - await asyncio.sleep(0) + '''External entry point for block processing. - if self.height == self.daemon.cached_height(): - self.logger.info('caught up to height {:d}' - .format(self_height)) - self.flush(True) + A simple wrapper that safely flushes the DB on clean + shutdown. + ''' + try: + await self.advance_blocks() finally: - if self.daemon.cached_height() is not None: - self.flush(True) + self.flush(True) + + async def advance_blocks(self): + '''Loop forever processing blocks in the forward direction.''' + caught_up = False + while True: + blocks = await self.prefetcher.get_blocks() + for block in blocks: + if not self.advance_block(block): + await self.handle_chain_reorg() + caught_up = False + break + await asyncio.sleep(0) # Yield + + if not caught_up and self.height == self.daemon.cached_height(): + caught_up = True + self.logger.info('caught up to height {:,d}' + .format(self.height)) + + async def handle_chain_reorg(self): + hashes = await self.reorg_hashes(self) + hex_hashes = [hash_to_str(hash) for hash in hashes] + blocks = await self.daemon.raw_blocks(hex_hashes) + for block in reversed(blocks): + self.backup_block(block) + await self.prefetcher.clear() + + async def reorg_hashes(self): + '''Return the list of hashes to back up beacuse of a reorg. + + The hashes are returned in order of increasing height.''' + def match_pos(hashes1, hashes2): + for n, (hash1, hash2) in enumerate(zip(hashes1, hashes2)): + if hash1 == hash2: + return n + return -1 + + self.logger.info('chain reorg detected; finding common height...') + + start = self.height - 1 + count = 1 + while True: + hashes = self.fs_cache.block_hashes(start, count) + d_hex_hashes = await self.daemon.block_hex_hashes(start, count) + d_hashes = [bytes.fromhex(hex_hash) for hex_hash in d_hex_hashes] + n = match_pos(hashes, d_hashes) + if n >= 0: + break + assert start > 0 + count = min(count * 2, start) + start -= count + + # Hashes differ from height 'start' + start += n + 1 + count = (self.height - start) + 1 + + self.logger.info('chain was reorganised for {:,d} blocks starting ' + 'at height {:,d}', start, count) + + return self.fs_cache.block_hashes(start, count) def open_db(self, coin): - block_size = 4 * 1024 db_name = '{}-{}'.format(coin.NAME, coin.NET) try: db = plyvel.DB(db_name, create_if_missing=False, - error_if_exists=False, compression=None, - block_size = block_size) + error_if_exists=False, compression=None) except: db = plyvel.DB(db_name, create_if_missing=True, - error_if_exists=True, compression=None, - block_size = block_size) + error_if_exists=True, compression=None) self.logger.info('created new database {}'.format(db_name)) self.flush_state(db) else: @@ -343,21 +406,19 @@ class BlockProcessor(LoggedClass): .format(utxo_MB + hist_MB, utxo_MB, hist_MB)) return utxo_MB, hist_MB - def process_block(self, block): - # We must update the fs_cache before calling process_tx() as - # it uses the fs_cache for tx hash lookup - header, tx_hashes, txs = self.fs_cache.process_block(block) + def advance_block(self, block): + # We must update the fs_cache before calling advance_txs() as + # the UTXO cache uses the fs_cache via get_tx_hash() to + # resolve compressed key collisions + header, tx_hashes, txs = self.coin.read_block(block) + self.fs_cache.advance_block(header, tx_hashes, txs) prev_hash, header_hash = self.coin.header_hashes(header) if prev_hash != self.tip: - raise ChainError('trying to build header with prev_hash {} ' - 'on top of tip with hash {}' - .format(hash_to_str(prev_hash), - hash_to_str(self.tip))) + return False self.tip = header_hash self.height += 1 - for tx_hash, tx in zip(tx_hashes, txs): - self.process_tx(tx_hash, tx) + self.advance_txs(tx_hashes, txs) # Check if we're getting full and time to flush? now = time.time() @@ -367,21 +428,31 @@ class BlockProcessor(LoggedClass): if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB: self.flush(utxo_MB >= self.utxo_MB) - def process_tx(self, tx_hash, tx): + return True + + def advance_txs(self, tx_hashes, txs): cache = self.utxo_cache tx_num = self.tx_count - # Add the outputs as new UTXOs; spend the inputs - hash168s = cache.add_many(tx_hash, tx_num, tx.outputs) - if not tx.is_coinbase: - for txin in tx.inputs: - hash168s.add(cache.spend(txin.prevout)) + for tx_hash, tx in zip(tx_hashes, txs): + # Add the outputs as new UTXOs; spend the inputs + hash168s = cache.add_many(tx_hash, tx_num, tx.outputs) + if not tx.is_coinbase: + for txin in tx.inputs: + hash168s.add(cache.spend(txin.prevout)) - for hash168 in hash168s: - self.history[hash168].append(tx_num) - self.history_size += len(hash168s) + for hash168 in hash168s: + self.history[hash168].append(tx_num) + self.history_size += len(hash168s) + tx_num += 1 - self.tx_count += 1 + self.tx_count = tx_num + + def backup_block(self, block): + pass + + def undo_txs(self, tx_hashes, txs): + pass @staticmethod def resolve_limit(limit): diff --git a/server/cache.py b/server/cache.py index ae6c32f..315f1a4 100644 --- a/server/cache.py +++ b/server/cache.py @@ -9,8 +9,8 @@ from bisect import bisect_right from collections import namedtuple from lib.script import ScriptPubKey -from lib.util import LoggedClass -from lib.hash import hash_to_str +from lib.util import chunks, LoggedClass +from lib.hash import double_sha256, hash_to_str # History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries @@ -285,6 +285,8 @@ class FSCache(LoggedClass): self.headers_file = self.open_file('headers', is_new) self.txcount_file = self.open_file('txcount', is_new) + # tx_counts[N] has the cumulative number of txs at the end of + # height N. So tx_counts[0] is 1 - the genesis coinbase self.tx_counts = array.array('I') self.txcount_file.seek(0) self.tx_counts.fromfile(self.txcount_file, self.height + 1) @@ -302,33 +304,33 @@ class FSCache(LoggedClass): return open(filename, 'wb+') raise - return self.tx_counts[self.height] if self.tx_counts else 0 - - def process_block(self, block): - '''Process a new block and return (header, tx_hashes, txs)''' - assert len(self.tx_counts) == self.height + 1 + len(self.headers) - - triple = header, tx_hashes, txs = self.coin.read_block(block) + def advance_block(self, header, tx_hashes, txs): + '''Update the FS cache for a new block.''' + prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0 # Cache the new header, tx hashes and cumulative tx count self.headers.append(header) self.tx_hashes.append(tx_hashes) - prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0 self.tx_counts.append(prior_tx_count + len(txs)) - return triple + def backup_block(self, block): + '''Revert a block and return (header, tx_hashes, txs)''' + pass def flush(self, new_height, new_tx_count): - '''Flush the things stored on the filesystem.''' + '''Flush the things stored on the filesystem. + The arguments are passed for sanity check assertions only.''' self.logger.info('flushing to file system') - block_count = len(self.headers) - assert self.height + block_count == new_height - assert len(self.tx_hashes) == block_count - assert len(self.tx_counts) == self.height + 1 + block_count - assert new_tx_count == self.tx_counts[-1] if self.tx_counts else 0 + blocks_done = len(self.headers) prior_tx_count = self.tx_counts[self.height] if self.height >= 0 else 0 - tx_diff = new_tx_count - prior_tx_count + cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 + txs_done = cur_tx_count - prior_tx_count + + assert self.height + blocks_done == new_height + assert cur_tx_count == new_tx_count + assert len(self.tx_hashes) == blocks_done + assert len(self.tx_counts) == new_height + 1 # First the headers headers = b''.join(self.headers) @@ -345,7 +347,7 @@ class FSCache(LoggedClass): # Finally the hashes hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes))) assert len(hashes) % 32 == 0 - assert len(hashes) // 32 == tx_diff + assert len(hashes) // 32 == txs_done cursor = 0 file_pos = prior_tx_count * 32 while cursor < len(hashes): @@ -362,9 +364,9 @@ class FSCache(LoggedClass): self.tx_hashes = [] self.headers = [] - self.height += block_count + self.height += blocks_done - return tx_diff + return txs_done def read_headers(self, height, count): read_count = min(count, self.height + 1 - height) @@ -403,6 +405,11 @@ class FSCache(LoggedClass): return tx_hash, height + def block_hashes(self, height, count): + headers = self.read_headers(height, count) + hlen = self.coin.HEADER_LEN + return [double_sha256(header) for header in chunks(headers, hlen)] + def encode_header(self, height): if height < 0 or height > self.height + len(self.headers): raise Exception('no header information for height {:,d}' diff --git a/server/protocol.py b/server/protocol.py index 47a2561..04b4dcf 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -7,7 +7,6 @@ import json import traceback from functools import partial -from lib.hash import hash_to_str from lib.util import LoggedClass from server.version import VERSION