From 530c7cac6f0af153ad0340d4fe3907042f4a9c21 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Fri, 20 Jul 2018 12:44:33 +0800 Subject: [PATCH] Clean up shutdown process of the block processor - enables removal of executor code - clarify flush guarantees --- docs/changelog.rst | 3 ++ electrumx/lib/server_base.py | 59 +++++++++++++------------- electrumx/lib/tasks.py | 12 +++--- electrumx/server/block_processor.py | 64 +++++++++++++++++------------ electrumx/server/chain_state.py | 8 ++-- electrumx/server/controller.py | 16 ++++---- electrumx/server/session.py | 5 ++- 7 files changed, 91 insertions(+), 76 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index d4d1677..33ce806 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -13,6 +13,9 @@ Version 1.7 (in progress) Version 1.6.1 (in progress) ============================ +* cleaner shutdown process with clear guarantees +* add Motion coin (ocruzv) + Version 1.6 (19 July 2018) =========================== diff --git a/electrumx/lib/server_base.py b/electrumx/lib/server_base.py index 60adc12..a68952b 100644 --- a/electrumx/lib/server_base.py +++ b/electrumx/lib/server_base.py @@ -12,6 +12,7 @@ import sys import time from functools import partial +from electrumx.lib.tasks import Tasks from electrumx.lib.util import class_logger @@ -37,9 +38,14 @@ class ServerBase(object): '''Save the environment, perform basic sanity checks, and set the event loop policy. ''' + # First asyncio operation must be to set the event loop policy + # as this replaces the event loop + asyncio.set_event_loop_policy(env.loop_policy) + self.logger = class_logger(__name__, self.__class__.__name__) self.logger.info(f'Python version: {sys.version}') self.env = env + self.tasks = Tasks() # Sanity checks if sys.version_info < self.PYTHON_MIN_VERSION: @@ -53,10 +59,6 @@ class ServerBase(object): 'To continue as root anyway, restart with ' 'environment variable ALLOW_ROOT non-empty') - # First asyncio operation must be to set the event loop policy - # as this replaces the event loop - asyncio.set_event_loop_policy(self.env.loop_policy) - # Trigger this event to cleanly shutdown self.shutdown_event = asyncio.Event() @@ -69,26 +71,6 @@ class ServerBase(object): '''Override to perform the shutdown sequence, if any.''' pass - async def _wait_for_shutdown_event(self): - '''Wait for shutdown to be signalled, and log it. - - Derived classes may want to provide a shutdown() coroutine.''' - # Shut down cleanly after waiting for shutdown to be signalled - await self.shutdown_event.wait() - self.logger.info('shutting down') - - # Wait for the shutdown sequence - await self.shutdown() - - # Finally, work around an apparent asyncio bug that causes log - # spew on shutdown for partially opened SSL sockets - try: - del asyncio.sslproto._SSLProtocolTransport.__del__ - except Exception: - pass - - self.logger.info('shutdown complete') - def on_signal(self, signame): '''Call on receipt of a signal to cleanly shutdown.''' self.logger.warning('received {} signal, initiating shutdown' @@ -104,7 +86,7 @@ class ServerBase(object): return loop.default_exception_handler(context) - def run(self): + async def _main(self, loop): '''Run the server application: - record start time @@ -116,13 +98,32 @@ class ServerBase(object): ''' self.start_time = time.time() - loop = asyncio.get_event_loop() - for signame in ('SIGINT', 'SIGTERM'): loop.add_signal_handler(getattr(signal, signame), partial(self.on_signal, signame)) loop.set_exception_handler(self.on_exception) - loop.run_until_complete(self.start_servers()) - loop.run_until_complete(self._wait_for_shutdown_event()) + self.tasks.create_task(self.start_servers()) + + # Wait for shutdown to be signalled, and log it. + # Derived classes may want to provide a shutdown() coroutine. + await self.shutdown_event.wait() + self.logger.info('shutting down') + await self.shutdown() + + # Let the loop clean itself up; prevents some silly logs + await asyncio.sleep(0.001) + + # Finally, work around an apparent asyncio bug that causes log + # spew on shutdown for partially opened SSL sockets + try: + del asyncio.sslproto._SSLProtocolTransport.__del__ + except Exception: + pass + + self.logger.info('shutdown complete') + + def run(self): + loop = asyncio.get_event_loop() + loop.run_until_complete(self._main(loop)) loop.close() diff --git a/electrumx/lib/tasks.py b/electrumx/lib/tasks.py index 6b540a9..2ed73c5 100644 --- a/electrumx/lib/tasks.py +++ b/electrumx/lib/tasks.py @@ -26,8 +26,6 @@ '''Concurrency via tasks and threads.''' -from concurrent.futures import ThreadPoolExecutor - from aiorpcx import TaskSet import electrumx.lib.util as util @@ -40,12 +38,8 @@ class Tasks(object): def __init__(self, *, loop=None): self.tasks = TaskSet(loop=loop) self.logger = util.class_logger(__name__, self.__class__.__name__) - # FIXME: is the executor still needed? - self.executor = ThreadPoolExecutor() - self.tasks.loop.set_default_executor(self.executor) # Pass through until integrated self.loop = self.tasks.loop - self.cancel_all = self.tasks.cancel_all self.wait = self.tasks.wait async def run_in_thread(self, func, *args): @@ -65,3 +59,9 @@ class Tasks(object): task.result() except Exception as e: self.logger.exception(f'uncaught task exception: {e}') + + async def cancel_all(self, wait=True): + '''Cancels all tasks and waits for them to complete.''' + self.tasks.cancel_all() + if wait: + await self.tasks.wait() diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index daa0de0..a18708d 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -154,6 +154,7 @@ class BlockProcessor(electrumx.server.db.DB): self._caught_up_event = asyncio.Event() self.task_queue = asyncio.Queue() + self.prefetcher = Prefetcher(self) # Meta self.cache_MB = env.cache_MB @@ -175,7 +176,10 @@ class BlockProcessor(electrumx.server.db.DB): self.utxo_cache = {} self.db_deletes = [] - self.prefetcher = Prefetcher(self) + # If the lock is successfully acquired, in-memory chain state + # is consistent with self.height + self.state_lock = asyncio.Lock() + self.worker_task = None def add_task(self, task): '''Add the task to our task queue.''' @@ -201,15 +205,6 @@ class BlockProcessor(electrumx.server.db.DB): ''' self.callbacks.append(callback) - def shutdown(self, executor): - '''Shutdown cleanly and flush to disk.''' - # First stut down the executor; it may be processing a block. - # Then we can flush anything remaining to disk. - executor.shutdown() - if self.height != self.db_height: - self.logger.info('flushing state to DB for a clean shutdown...') - self.flush(True) - async def check_and_advance_blocks(self, raw_blocks, first): '''Process the list of raw blocks passed. Detects and handles reorgs. @@ -232,7 +227,8 @@ class BlockProcessor(electrumx.server.db.DB): if hprevs == chain: start = time.time() - await self.tasks.run_in_thread(self.advance_blocks, blocks) + async with self.state_lock: + await self.tasks.run_in_thread(self.advance_blocks, blocks) if not self.first_sync: s = '' if len(blocks) == 1 else 's' self.logger.info('processed {:,d} block{} in {:.1f}s' @@ -253,16 +249,6 @@ class BlockProcessor(electrumx.server.db.DB): 'resetting the prefetcher') await self.prefetcher.reset_height() - def force_chain_reorg(self, count): - '''Force a reorg of the given number of blocks. - - Returns True if a reorg is queued, false if not caught up. - ''' - if self._caught_up_event.is_set(): - self.add_task(partial(self.reorg_chain, count=count)) - return True - return False - async def reorg_chain(self, count=None): '''Handle a chain reorganisation. @@ -289,7 +275,8 @@ class BlockProcessor(electrumx.server.db.DB): last = start + count - 1 for hex_hashes in chunks(hashes, 50): raw_blocks = await get_raw_blocks(last, hex_hashes) - await self.tasks.run_in_thread(self.backup_blocks, raw_blocks) + async with self.state_lock: + await self.tasks.run_in_thread(self.backup_blocks, raw_blocks) last -= len(raw_blocks) # Truncate header_mc: header count is 1 more than the height self.header_mc.truncate(self.height + 1) @@ -770,8 +757,8 @@ class BlockProcessor(electrumx.server.db.DB): self.db_height = self.height self.db_tip = self.tip - async def _process_blocks_forever(self): - '''Loop forever processing blocks.''' + async def _process_queue(self): + '''Loop forever processing enqueued work.''' while True: task = await self.task_queue.get() await task() @@ -805,13 +792,13 @@ class BlockProcessor(electrumx.server.db.DB): self.tasks.create_task(self.prefetcher.main_loop()) await self.prefetcher.reset_height() # Start our loop that processes blocks as they are fetched - self.tasks.create_task(self._process_blocks_forever()) + self.worker_task = self.tasks.create_task(self._process_queue()) # Wait until caught up await self._caught_up_event.wait() # Flush everything but with first_sync->False state. first_sync = self.first_sync self.first_sync = False - await self.tasks.run_in_thread(self.flush, True) + self.flush(True) if first_sync: self.logger.info(f'{electrumx.version} synced to ' f'height {self.height:,d}') @@ -822,3 +809,28 @@ class BlockProcessor(electrumx.server.db.DB): length = max(1, self.height - self.env.reorg_limit) self.header_mc = MerkleCache(self.merkle, HeaderSource(self), length) self.logger.info('populated header merkle cache') + + def force_chain_reorg(self, count): + '''Force a reorg of the given number of blocks. + + Returns True if a reorg is queued, false if not caught up. + ''' + if self._caught_up_event.is_set(): + self.add_task(partial(self.reorg_chain, count=count)) + return True + return False + + async def shutdown(self): + '''Shutdown cleanly and flush to disk. + + If during initial sync ElectrumX is asked to shut down when a + large number of blocks have been processed but not written to + disk, it should write those to disk before exiting, as + otherwise a significant amount of work could be lost. + ''' + if self.worker_task: + async with self.state_lock: + # Shut down block processing + self.worker_task.cancel() + self.logger.info('flushing to DB for a clean shutdown...') + self.flush(True) diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index d0656ff..0a3f6b7 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -16,10 +16,9 @@ class ChainState(object): blocks, transaction history, UTXOs and the mempool. ''' - def __init__(self, env, tasks, shutdown_event): + def __init__(self, env, tasks): self.env = env self.tasks = tasks - self.shutdown_event = shutdown_event self.daemon = env.coin.DAEMON(env) BlockProcessor = env.coin.BLOCK_PROCESSOR self.bp = BlockProcessor(env, tasks, self.daemon) @@ -103,8 +102,9 @@ class ChainState(object): self.daemon.set_urls(self.env.coin.daemon_urls(daemon_url)) return self.daemon.logged_url() - def shutdown(self): - self.tasks.loop.call_soon(self.shutdown_event.set) + async def shutdown(self): + '''Shut down the block processor to flush chain state to disk.''' + await self.bp.shutdown() async def wait_for_mempool(self): await self.bp.catch_up_to_daemon() diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 8ed3abe..a347304 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -9,7 +9,6 @@ from aiorpcx import _version as aiorpcx_version import electrumx from electrumx.lib.server_base import ServerBase -from electrumx.lib.tasks import Tasks from electrumx.lib.util import version_string from electrumx.server.chain_state import ChainState from electrumx.server.peers import PeerManager @@ -39,11 +38,10 @@ class Controller(ServerBase): self.logger.info(f'supported protocol versions: {min_str}-{max_str}') self.logger.info(f'event loop policy: {env.loop_policy}') - self.tasks = Tasks() - self.chain_state = ChainState(env, self.tasks, self.shutdown_event) + self.chain_state = ChainState(env, self.tasks) self.peer_mgr = PeerManager(env, self.tasks, self.chain_state) self.session_mgr = SessionManager(env, self.tasks, self.chain_state, - self.peer_mgr) + self.peer_mgr, self.shutdown_event) async def start_servers(self): '''Start the RPC server and wait for the mempool to synchronize. Then @@ -56,9 +54,9 @@ class Controller(ServerBase): async def shutdown(self): '''Perform the shutdown sequence.''' - # Not certain of ordering here - self.tasks.cancel_all() + # Close servers and connections - main source of new task creation await self.session_mgr.shutdown() - await self.tasks.wait() - # Finally shut down the block processor and executor (FIXME) - self.chain_state.bp.shutdown(self.tasks.executor) + # Flush chain state to disk + await self.chain_state.shutdown() + # Cancel all tasks; this shuts down the peer manager and prefetcher + await self.tasks.cancel_all(wait=True) diff --git a/electrumx/server/session.py b/electrumx/server/session.py index ea6ce49..4be78b1 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -97,12 +97,13 @@ class SessionManager(object): CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) - def __init__(self, env, tasks, chain_state, peer_mgr): + def __init__(self, env, tasks, chain_state, peer_mgr, shutdown_event): env.max_send = max(350000, env.max_send) self.env = env self.tasks = tasks self.chain_state = chain_state self.peer_mgr = peer_mgr + self.shutdown_event = shutdown_event self.logger = util.class_logger(__name__, self.__class__.__name__) self.servers = {} self.sessions = set() @@ -361,7 +362,7 @@ class SessionManager(object): def rpc_stop(self): '''Shut down the server cleanly.''' - self.chain_state.shutdown() + self.shutdown_event.set() return 'stopping' def rpc_getinfo(self):