diff --git a/lib/server_base.py b/lib/server_base.py new file mode 100644 index 0000000..e59df48 --- /dev/null +++ b/lib/server_base.py @@ -0,0 +1,128 @@ +# Copyright (c) 2017, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + +import asyncio +import os +import signal +import sys +import time +from functools import partial + +import lib.util as util + + +class ServerBase(util.LoggedClass): + '''Base class server implementation. + + Derived classes are expected to: + + - set PYTHON_MIN_VERSION and SUPPRESS_MESSAGES as appropriate + - implement the start_servers() coroutine, called from the run() method. + Upon return the event loop runs until the shutdown signal is received. + - implement the shutdown() coroutine + ''' + + SUPPRESS_MESSAGES = [ + 'Fatal read error on socket transport', + 'Fatal write error on socket transport', + ] + + PYTHON_MIN_VERSION = (3, 6) + + def __init__(self, env): + '''Save the environment, perform basic sanity checks, and set the + event loop policy. + ''' + super().__init__() + self.env = env + + # Sanity checks + if sys.version_info < self.PYTHON_MIN_VERSION: + mvs = '.'.join(str(part) for part in self.PYTHON_MIN_VERSION) + raise RuntimeError('Python version >= {} is required'.format(mvs)) + + if os.geteuid() == 0 and not env.allow_root: + raise RuntimeError('RUNNING AS ROOT IS STRONGLY DISCOURAGED!\n' + 'You shoud create an unprivileged user account ' + 'and use that.\n' + 'To continue as root anyway, restart with ' + 'environment variable ALLOW_ROOT non-empty') + + # First asyncio operation must be to set the event loop policy + # as this replaces the event loop + self.logger.info('event loop policy: {}'.format(self.env.loop_policy)) + asyncio.set_event_loop_policy(self.env.loop_policy) + + # Trigger this event to cleanly shutdown + self.shutdown_event = asyncio.Event() + + async def start_servers(self): + '''Override to perform initialization that requires the event loop, + and start servers.''' + pass + + async def shutdown(self): + '''Override to perform the shutdown sequence, if any.''' + pass + + async def _wait_for_shutdown_event(self): + '''Wait for shutdown to be signalled, and log it. + + Derived classes may want to provide a shutdown() coroutine.''' + # Shut down cleanly after waiting for shutdown to be signalled + await self.shutdown_event.wait() + self.logger.info('shutting down') + + # Wait for the shutdown sequence + await self.shutdown() + + # Finally, work around an apparent asyncio bug that causes log + # spew on shutdown for partially opened SSL sockets + try: + del asyncio.sslproto._SSLProtocolTransport.__del__ + except Exception: + pass + + self.logger.info('shutdown complete') + + def on_signal(self, signame): + '''Call on receipt of a signal to cleanly shutdown.''' + self.logger.warning('received {} signal, initiating shutdown' + .format(signame)) + self.shutdown_event.set() + + def on_exception(self, loop, context): + '''Suppress spurious messages it appears we cannot control.''' + message = context.get('message') + if message in self.SUPPRESS_MESSAGES: + return + if 'accept_connection2()' in repr(context.get('task')): + return + loop.default_exception_handler(context) + + def run(self): + '''Run the server application: + + - record start time + - set the event loop policy as specified by the environment + - install SIGINT and SIGKILL handlers to trigger shutdown_event + - set loop's exception handler to suppress unwanted messages + - run the event loop until start_servers() completes + - run the event loop until shutdown is signalled + ''' + self.start_time = time.time() + + loop = asyncio.get_event_loop() + + for signame in ('SIGINT', 'SIGTERM'): + loop.add_signal_handler(getattr(signal, signame), + partial(self.on_signal, signame)) + loop.set_exception_handler(self.on_exception) + + loop.run_until_complete(self.start_servers()) + loop.run_until_complete(self._wait_for_shutdown_event()) + loop.close() diff --git a/server/controller.py b/server/controller.py index 59ae4d5..3df855f 100644 --- a/server/controller.py +++ b/server/controller.py @@ -8,9 +8,7 @@ import asyncio import json import os -import signal import ssl -import sys import time import traceback from bisect import bisect_left @@ -23,6 +21,7 @@ import pylru from lib.jsonrpc import JSONSessionBase, RPCError from lib.hash import double_sha256, hash_to_str, hex_str_to_hash from lib.peer import Peer +from lib.server_base import ServerBase import lib.util as util from server.daemon import DaemonError from server.mempool import MemPool @@ -30,50 +29,22 @@ from server.peers import PeerManager from server.session import LocalRPC -class Controller(util.LoggedClass): +class Controller(ServerBase): '''Manages the client servers, a mempool, and a block processor. Servers are started immediately the block processor first catches up with the daemon. ''' + PYTHON_MIN_VERSION = (3, 5, 3) BANDS = 5 CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) - SUPPRESS_MESSAGES = [ - 'Fatal read error on socket transport', - 'Fatal write error on socket transport', - ] def __init__(self, env): - super().__init__() + '''Initialize everything that doesn't require the event loop.''' + super().__init__(env) - # Sanity checks - if sys.version_info < (3, 5, 3): - raise RuntimeError('Python >= 3.5.3 is required to run ElectrumX') - - if os.geteuid() == 0 and not env.allow_root: - raise RuntimeError('RUNNING AS ROOT IS STRONGLY DISCOURAGED!\n' - 'You shoud create an unprivileged user account ' - 'and use that.\n' - 'To continue as root anyway, restart with ' - 'environment variable ALLOW_ROOT non-empty') - - # Set the event loop policy before doing anything asyncio - self.logger.info('event loop policy: {}'.format(env.loop_policy)) - asyncio.set_event_loop_policy(env.loop_policy) - self.loop = asyncio.get_event_loop() - - # Set this event to cleanly shutdown - self.shutdown_event = asyncio.Event() - self.executor = ThreadPoolExecutor() - self.loop.set_default_executor(self.executor) - self.start_time = time.time() self.coin = env.coin - self.daemon = self.coin.DAEMON(env) - self.bp = self.coin.BLOCK_PROCESSOR(env, self, self.daemon) - self.mempool = MemPool(self.bp, self) - self.peer_mgr = PeerManager(env, self) - self.env = env self.servers = {} # Map of session to the key of its list in self.groups self.sessions = {} @@ -98,6 +69,47 @@ class Controller(util.LoggedClass): 'sessions stop'.split()) self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds} + self.loop = asyncio.get_event_loop() + self.executor = ThreadPoolExecutor() + self.loop.set_default_executor(self.executor) + + # The complex objects. Note PeerManager references self.loop (ugh) + self.daemon = self.coin.DAEMON(env) + self.bp = self.coin.BLOCK_PROCESSOR(env, self, self.daemon) + self.mempool = MemPool(self.bp, self) + self.peer_mgr = PeerManager(env, self) + + async def start_servers(self): + '''Start the RPC server and schedule the external servers to be + started once the block processor has caught up. + ''' + if self.env.rpc_port is not None: + await self.start_server('RPC', self.env.cs_host(for_rpc=True), + self.env.rpc_port) + + self.ensure_future(self.bp.main_loop()) + self.ensure_future(self.wait_for_bp_catchup()) + + async def shutdown(self): + '''Perform the shutdown sequence.''' + self.state = self.SHUTTING_DOWN + + # Close servers and sessions + self.close_servers(list(self.servers.keys())) + for session in self.sessions: + self.close_session(session) + + # Cancel pending futures + for future in self.futures: + future.cancel() + + # Wait for all futures to finish + while not all(future.done() for future in self.futures): + await asyncio.sleep(0.1) + + # Finally shut down the block processor and executor + self.bp.shutdown(self.executor) + async def mempool_transactions(self, hashX): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool entries for the hashX. @@ -195,58 +207,16 @@ class Controller(util.LoggedClass): self.next_log_sessions = time.time() + self.env.log_sessions async def wait_for_bp_catchup(self): - '''Called when the block processor catches up.''' + '''Wait for the block processor to catch up, then kick off server + background processes.''' await self.bp.caught_up_event.wait() self.logger.info('block processor has caught up') self.ensure_future(self.peer_mgr.main_loop()) - self.ensure_future(self.start_servers()) + self.ensure_future(self.log_start_external_servers()) self.ensure_future(self.housekeeping()) self.ensure_future(self.mempool.main_loop()) self.ensure_future(self.notify()) - async def main_loop(self): - '''Controller main loop.''' - if self.env.rpc_port is not None: - await self.start_server('RPC', self.env.cs_host(for_rpc=True), - self.env.rpc_port) - self.ensure_future(self.bp.main_loop()) - self.ensure_future(self.wait_for_bp_catchup()) - - # Shut down cleanly after waiting for shutdown to be signalled - await self.shutdown_event.wait() - self.logger.info('shutting down') - await self.shutdown() - # Avoid log spew on shutdown for partially opened SSL sockets - try: - del asyncio.sslproto._SSLProtocolTransport.__del__ - except Exception: - pass - self.logger.info('shutdown complete') - - def initiate_shutdown(self): - '''Call this function to start the shutdown process.''' - self.shutdown_event.set() - - async def shutdown(self): - '''Perform the shutdown sequence.''' - self.state = self.SHUTTING_DOWN - - # Close servers and sessions - self.close_servers(list(self.servers.keys())) - for session in self.sessions: - self.close_session(session) - - # Cancel pending futures - for future in self.futures: - future.cancel() - - # Wait for all futures to finish - while not all(future.done() for future in self.futures): - await asyncio.sleep(0.1) - - # Finally shut down the block processor and executor - self.bp.shutdown(self.executor) - def close_servers(self, kinds): '''Close the servers of the given kinds (TCP etc.).''' if kinds: @@ -272,7 +242,7 @@ class Controller(util.LoggedClass): self.logger.info('{} server listening on {}:{:d}' .format(kind, host, port)) - async def start_servers(self): + async def log_start_external_servers(self): '''Start TCP and SSL servers.''' self.logger.info('max session count: {:,d}'.format(self.max_sessions)) self.logger.info('session timeout: {:,d} seconds' @@ -628,7 +598,7 @@ class Controller(util.LoggedClass): def rpc_stop(self): '''Shut down the server cleanly.''' - self.initiate_shutdown() + self.shutdown_event.set() return 'stopping' def rpc_getinfo(self): @@ -907,32 +877,3 @@ class Controller(util.LoggedClass): if index >= len(tx.outputs): return None return self.coin.address_from_script(tx.outputs[index].pk_script) - - # Signal, exception handlers. - - def on_signal(self, signame): - '''Call on receipt of a signal to cleanly shutdown.''' - self.logger.warning('received {} signal, initiating shutdown' - .format(signame)) - self.initiate_shutdown() - - def on_exception(self, loop, context): - '''Suppress spurious messages it appears we cannot control.''' - message = context.get('message') - if message not in self.SUPPRESS_MESSAGES: - if not ('task' in context and - 'accept_connection2()' in repr(context.get('task'))): - loop.default_exception_handler(context) - - def run(self): - # Install signal handlers and exception handler - loop = self.loop - for signame in ('SIGINT', 'SIGTERM'): - loop.add_signal_handler(getattr(signal, signame), - partial(self.on_signal, signame)) - loop.set_exception_handler(self.on_exception) - - # Run the main loop to completion - future = asyncio.ensure_future(self.main_loop()) - loop.run_until_complete(future) - loop.close()