diff --git a/server/block_processor.py b/server/block_processor.py index 3e8e2f8..c3f16aa 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -23,14 +23,6 @@ from lib.util import chunks, formatted_time, LoggedClass import server.db -# Tasks placed on task queue -BLOCKS, CAUGHT_UP = range(2) - - -class ChainError(Exception): - pass - - class Prefetcher(LoggedClass): '''Prefetches blocks (in the forward direction only).''' @@ -81,12 +73,10 @@ class Prefetcher(LoggedClass): try: secs = 0 if self.cache_size < self.target_cache_size: - if await self._prefetch(): - self.tasks.put_nowait(BLOCKS) - else: - self.tasks.put_nowait(CAUGHT_UP) + if not await self._prefetch(): self.caught_up = True secs = 5 + self.tasks.put_nowait(None) await asyncio.sleep(secs) except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) @@ -128,6 +118,9 @@ class Prefetcher(LoggedClass): return count +class ChainError(Exception): + '''Raised on error processing blocks.''' + class ChainReorg(Exception): '''Raised on a blockchain reorganisation.''' @@ -194,19 +187,18 @@ class BlockProcessor(server.db.DB): if self.env.force_reorg > 0: self.logger.info('DEBUG: simulating reorg of {:,d} blocks' .format(self.env.force_reorg)) - await self.handle_chain_reorg(self.env.force_reorg, set()) + await self.handle_chain_reorg(set(), self.env.force_reorg) while True: task = await self.tasks.get() if self._shutdown: break - if task == BLOCKS: - await self.advance_blocks() - else: - assert task == CAUGHT_UP - if not self.caught_up: - self.caught_up = True - self.first_caught_up() + blocks = self.prefetcher.get_blocks() + if blocks: + await self.advance_blocks(blocks) + elif not self.caught_up: + self.caught_up = True + self.first_caught_up() self.flush(True) @@ -214,13 +206,9 @@ class BlockProcessor(server.db.DB): '''Call to shut down the block processor.''' self.logger.info('flushing state to DB for clean shutdown...') self._shutdown = True - # Ensure we don't sit waiting for a task - self.tasks.put_nowait(BLOCKS) - - async def advance_blocks(self): - '''Take blocks from the prefetcher and process them.''' - blocks = self.prefetcher.get_blocks() + self.tasks.put_nowait(None) + async def advance_blocks(self, blocks): '''Strip the unspendable genesis coinbase.''' if self.height == -1: blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) @@ -231,7 +219,7 @@ class BlockProcessor(server.db.DB): self.advance_block(block, touched) await asyncio.sleep(0) # Yield except ChainReorg: - await self.handle_chain_reorg(None, touched) + await self.handle_chain_reorg(touched) if self.caught_up: # Flush everything as queries are performed on the DB and @@ -243,18 +231,17 @@ class BlockProcessor(server.db.DB): self.next_cache_check = time.time() + 60 def first_caught_up(self): - '''Called when first caught up after start, or after a reorg.''' + '''Called when first caught up after starting.''' + self.flush(True) if self.first_sync: - self.first_sync = False self.logger.info('{} synced to height {:,d}' .format(VERSION, self.height)) - self.flush(True) + self.first_sync = False + self.flush_state(self.db) self.reopen_db(False) - else: - self.flush(True) self.event.set() - async def handle_chain_reorg(self, count, touched): + async def handle_chain_reorg(self, touched, count=None): '''Handle a chain reorganisation. Count is the number of blocks to simulate a reorg, or None for