diff --git a/server/block_processor.py b/server/block_processor.py index 6efabbb..ea8fb8c 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -196,11 +196,14 @@ class BlockProcessor(server.db.DB): task = await self.task_queue.get() await task() - def shutdown(self): + def shutdown(self, executor): + '''Shutdown cleanly and flush to disk.''' + # First stut down the executor; it may be processing a block. + # Then we can flush anything remaining to disk. + executor.shutdown() if self.height != self.db_height: self.logger.info('flushing state to DB for a clean shutdown...') self.flush(True) - self.logger.info('shutdown complete') async def executor(self, func, *args, **kwargs): '''Run func taking args in the executor.''' diff --git a/server/controller.py b/server/controller.py index 0b6e5f3..bfc49ee 100644 --- a/server/controller.py +++ b/server/controller.py @@ -225,7 +225,13 @@ class Controller(util.LoggedClass): # Perform a clean shutdown when this event is signalled. await self.shutdown_event.wait() - self.logger.info('shutting down gracefully') + + self.logger.info('shutting down') + await self.shutdown(futures) + self.logger.info('shutdown complete') + + async def shutdown(self, futures): + '''Perform the shutdown sequence.''' self.state = self.SHUTTING_DOWN # Close servers and sessions @@ -237,11 +243,12 @@ class Controller(util.LoggedClass): for future in futures: future.cancel() - await asyncio.wait(futures) + # Wait for all futures to finish + while any(not future.done() for future in futures): + await asyncio.sleep(1) - # Wait for the executor to finish anything it's doing - self.executor.shutdown() - self.bp.shutdown() + # Finally shut down the block processor and executor + self.bp.shutdown(self.executor) def close_servers(self, kinds): '''Close the servers of the given kinds (TCP etc.).''' @@ -253,18 +260,6 @@ class Controller(util.LoggedClass): if server: server.close() - async def wait_for_sessions(self, secs=30): - if not self.sessions: - return - self.logger.info('waiting up to {:d} seconds for socket cleanup' - .format(secs)) - limit = time.time() + secs - while self.sessions and time.time() < limit: - self.clear_stale_sessions(grace=secs//2) - await asyncio.sleep(2) - self.logger.info('{:,d} sessions remaining' - .format(len(self.sessions))) - async def start_server(self, kind, *args, **kw_args): protocol_class = LocalRPC if kind == 'RPC' else ElectrumX protocol = partial(protocol_class, self, self.bp, self.env, kind)