From 10840604936f753edd952de94b4104ca3e74b77b Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 17 Oct 2016 20:20:10 +0900 Subject: [PATCH] Clean up controller init, and sync on catch up --- server/controller.py | 109 +++++++++++++++++++++---------------------- server_main.py | 4 +- 2 files changed, 57 insertions(+), 56 deletions(-) diff --git a/server/controller.py b/server/controller.py index 58e33a1..cdd5e6e 100644 --- a/server/controller.py +++ b/server/controller.py @@ -54,23 +54,28 @@ class Controller(LoggedClass): .format(env.host, env.ssl_port)) coros = [ - self.block_cache.catch_up(), - self.block_cache.process_cache() + self.block_cache.prefetcher(), + self.block_cache.process_blocks(), ] - self.tasks = [asyncio.ensure_future(coro) for coro in coros] + for coro in coros: + asyncio.ensure_future(coro) # Signal handlers for signame in ('SIGINT', 'SIGTERM'): loop.add_signal_handler(getattr(signal, signame), - partial(self.on_signal, signame)) - - return self.tasks + partial(self.on_signal, loop, signame)) def stop(self): for server in self.servers: server.close() + def on_signal(self, loop, signame): + self.logger.warning('received {} signal, preparing to shut down' + .format(signame)) + for task in asyncio.Task.all_tasks(loop): + task.cancel() + def add_session(self, session): self.sessions.add(session) @@ -97,12 +102,6 @@ class Controller(LoggedClass): self.jobs = jobs await asyncio.sleep(5) - def on_signal(self, signame): - self.logger.warning('received {} signal, preparing to shut down' - .format(signame)) - for task in self.tasks: - task.cancel() - def address_status(self, hash168): '''Returns status as 32 bytes.''' status = self.addresses.get(hash168) @@ -122,70 +121,70 @@ class Controller(LoggedClass): per peer.''' return self.peers - class BlockCache(LoggedClass): - '''Requests blocks ahead of time from the daemon. Serves them - to the blockchain processor.''' + '''Requests and caches blocks ahead of time from the daemon. Serves + them to the blockchain processor. Coordinates backing up in case of + block chain reorganisations. + ''' def __init__(self, env, db): super().__init__() self.db = db self.daemon_url = env.daemon_url - # Cache target size is in MB. Has little effect on sync time. - self.cache_limit = 10 + # Target cache size. Has little effect on sync time. + self.target_cache_size = 10 * 1024 * 1024 self.daemon_height = 0 self.fetched_height = db.height - # Blocks stored in reverse order. Next block is at end of list. - self.blocks = [] - self.recent_sizes = [] - self.ave_size = 0 + self.queue = asyncio.Queue() + self.queue_size = 0 + self.recent_sizes = [0] self.logger.info('using daemon URL {}'.format(self.daemon_url)) - async def process_cache(self): - while True: - await asyncio.sleep(1) - while self.blocks: - self.db.process_block(self.blocks.pop(), self.daemon_height) - # Release asynchronous block fetching - await asyncio.sleep(0) - - async def catch_up(self): - self.logger.info('catching up, block cache limit {:d}MB...' - .format(self.cache_limit)) + def flush_db(self): + self.db.flush(self.daemon_height, True) + async def process_blocks(self): try: - while await self.maybe_prefill(): - await asyncio.sleep(1) - self.logger.info('caught up to height {:d}' - .format(self.daemon_height)) + while True: + blocks, total_size = await self.queue.get() + self.queue_size -= total_size + for block in blocks: + self.db.process_block(block, self.daemon_height) + # Release asynchronous block fetching + await asyncio.sleep(0) + + if self.db.height == self.daemon_height: + self.logger.info('caught up to height {:d}' + .format(self.daemon_height)) + self.flush_db() finally: - self.db.flush(self.daemon_height, True) + self.flush_db() + + async def prefetcher(self): + '''Loops forever polling for more blocks.''' + self.logger.info('prefetching blocks...') + while True: + await self.maybe_prefetch() + await asyncio.sleep(2) def cache_used(self): return sum(len(block) for block in self.blocks) def prefill_count(self, room): - count = 0 - if self.ave_size: - count = room // self.ave_size + ave_size = sum(self.recent_sizes) // len(self.recent_sizes) + count = room // ave_size if ave_size else 0 return max(count, 10) - async def maybe_prefill(self): - '''Returns False to stop. True to sleep a while for asynchronous - processing.''' - cache_limit = self.cache_limit * 1024 * 1024 - while True: - cache_used = self.cache_used() - if cache_used > cache_limit: - return True - + async def maybe_prefetch(self): + '''Prefetch blocks if there are any to prefetch.''' + while self.queue_size < self.target_cache_size: # Keep going by getting a whole new cache_limit of blocks self.daemon_height = await self.send_single('getblockcount') max_count = min(self.daemon_height - self.fetched_height, 4000) - count = min(max_count, self.prefill_count(cache_limit)) + count = min(max_count, self.prefill_count(self.target_cache_size)) if not count: - return False # Done catching up + break first = self.fetched_height + 1 param_lists = [[height] for height in range(first, first + count)] @@ -198,16 +197,16 @@ class BlockCache(LoggedClass): # Convert hex string to bytes blocks = [bytes.fromhex(block) for block in blocks] - # Reverse order and place at front of list - self.blocks = list(reversed(blocks)) + self.blocks + sizes = [len(block) for block in blocks] + total_size = sum(sizes) + self.queue.put_nowait((blocks, total_size)) + self.queue_size += total_size # Keep 50 most recent block sizes for fetch count estimation - sizes = [len(block) for block in blocks] self.recent_sizes.extend(sizes) excess = len(self.recent_sizes) - 50 if excess > 0: self.recent_sizes = self.recent_sizes[excess:] - self.ave_size = sum(self.recent_sizes) // len(self.recent_sizes) async def send_single(self, method, params=None): payload = {'method': method} diff --git a/server_main.py b/server_main.py index 379d382..a4a1248 100755 --- a/server_main.py +++ b/server_main.py @@ -26,7 +26,9 @@ def main_loop(): #loop.set_debug(True) controller = Controller(env) - tasks = controller.start(loop) + controller.start(loop) + + tasks = asyncio.Task.all_tasks(loop) try: loop.run_until_complete(asyncio.gather(*tasks)) except asyncio.CancelledError: