From 131601a7b2c9da4a53cb370d2d55a5bbb5dbd7e0 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 16 Jan 2017 21:13:01 +0900 Subject: [PATCH] Rework clean shutdown logic So that the main block processor future is cancellable. We wait for the executor and then flush anything unflushed. Resolves the rest of the second part of #100 --- server/block_processor.py | 20 ++++++-------------- server/controller.py | 32 ++++++++++++++++++-------------- server/db.py | 7 +++---- 3 files changed, 27 insertions(+), 32 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 157b5ab..6efabbb 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -151,7 +151,6 @@ class BlockProcessor(server.db.DB): self.caught_up_event = asyncio.Event() self.task_queue = asyncio.Queue() - self.stop = False # Meta self.cache_MB = env.cache_MB @@ -189,26 +188,19 @@ class BlockProcessor(server.db.DB): '''Called by the prefetcher when it first catches up.''' self.add_task(self.first_caught_up) - def on_shutdown(self): - '''Called by the controller to shut processing down.''' - async def do_nothing(): - pass - self.logger.info('preparing clean shutdown') - self.stop = True - # Ensure something is on the queue so main_loop notices self.stop - self.add_task(do_nothing) - async def main_loop(self): '''Main loop for block processing.''' await self.prefetcher.reset_height() - while not self.stop: + while True: task = await self.task_queue.get() await task() - self.logger.info('flushing state to DB for a clean shutdown...') - await self.executor(self.flush, True) - self.logger.info('shutdown complete') + def shutdown(self): + if self.height != self.db_height: + self.logger.info('flushing state to DB for a clean shutdown...') + self.flush(True) + self.logger.info('shutdown complete') async def executor(self, func, *args, **kwargs): '''Run func taking args in the executor.''' diff --git a/server/controller.py b/server/controller.py index ee8bf64..e14291a 100644 --- a/server/controller.py +++ b/server/controller.py @@ -13,6 +13,7 @@ import ssl import time from bisect import bisect_left from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor from functools import partial import pylru @@ -53,6 +54,8 @@ class Controller(util.LoggedClass): # Set this event to cleanly shutdown self.shutdown_event = asyncio.Event() self.loop = asyncio.get_event_loop() + self.executor = ThreadPoolExecutor() + self.loop.set_default_executor(self.executor) self.start = time.time() self.coin = env.coin self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url)) @@ -215,8 +218,8 @@ class Controller(util.LoggedClass): for n in range(4): add_future(self.serve_requests()) - bp_future = asyncio.ensure_future(self.bp.main_loop()) futures = [] + add_future(self.bp.main_loop()) add_future(self.bp.prefetcher.main_loop()) add_future(await_bp_catchup()) @@ -225,35 +228,36 @@ class Controller(util.LoggedClass): self.logger.info('shutting down gracefully') self.state = self.SHUTTING_DOWN - # First tell the block processor to shut down, it may need to - # perform a lengthy flush. Then shut down the rest. - self.bp.on_shutdown() + # Close servers and sessions self.close_servers(list(self.servers.keys())) + for session in self.sessions: + self.close_session(session) + + # Cancel the futures for future in futures: future.cancel() - # Now wait for the cleanup to complete - await self.close_sessions() - if not bp_future.done(): - self.logger.info('waiting for block processor') - await bp_future + await asyncio.wait(futures) + + # Wait for the executor to finish anything it's doing + self.executor.shutdown() + self.bp.shutdown() def close_servers(self, kinds): '''Close the servers of the given kinds (TCP etc.).''' - self.logger.info('closing down {} listening servers' - .format(', '.join(kinds))) + if kinds: + self.logger.info('closing down {} listening servers' + .format(', '.join(kinds))) for kind in kinds: server = self.servers.pop(kind, None) if server: server.close() - async def close_sessions(self, secs=30): + async def wait_for_sessions(self, secs=30): if not self.sessions: return self.logger.info('waiting up to {:d} seconds for socket cleanup' .format(secs)) - for session in self.sessions: - self.close_session(session) limit = time.time() + secs while self.sessions and time.time() < limit: self.clear_stale_sessions(grace=secs//2) diff --git a/server/db.py b/server/db.py index 0f920d5..44dacf3 100644 --- a/server/db.py +++ b/server/db.py @@ -123,7 +123,8 @@ class DB(util.LoggedClass): .format(util.formatted_time(self.wall_time))) def read_utxo_state(self): - if self.utxo_db.is_new: + state = self.utxo_db.get(b'state') + if not state: self.db_height = -1 self.db_tx_count = 0 self.db_tip = b'\0' * 32 @@ -132,9 +133,7 @@ class DB(util.LoggedClass): self.wall_time = 0 self.first_sync = True else: - state = self.utxo_db.get(b'state') - if state: - state = ast.literal_eval(state.decode()) + state = ast.literal_eval(state.decode()) if not isinstance(state, dict): raise self.DBError('failed reading state from DB') self.db_version = state['db_version']