diff --git a/docs/ENVIRONMENT.rst b/docs/ENVIRONMENT.rst index 4c7bc9a..d730b35 100644 --- a/docs/ENVIRONMENT.rst +++ b/docs/ENVIRONMENT.rst @@ -105,6 +105,14 @@ These environment variables are optional: to blank. The default is appropriate for **COIN** and **NET** (e.g., 8000 for Bitcoin mainnet) if not set. +* **EVENT_LOOP_POLICY** + + The name of an event loop policy to replace the default asyncio + policy, if any. At present only `uvloop` is accepted, in which case + you must have installed the `uvloop`_ Python package. + + If you are not sure what this means leave it unset. + * **DONATION_ADDRESS** The server donation address reported to Electrum clients. Defaults @@ -372,3 +380,4 @@ your available physical RAM: variables is roughly equivalent. .. _lib/coins.py: https://github.com/kyuupichan/electrumx/blob/master/lib/coins.py +.. _uvloop: https://pypi.python.org/pypi/uvloop diff --git a/electrumx_server.py b/electrumx_server.py index 946d445..77c7c6e 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -9,70 +9,20 @@ '''Script to kick off the server.''' -import asyncio import logging -import os -import signal -import sys import traceback -from functools import partial from server.env import Env from server.controller import Controller -SUPPRESS_MESSAGES = [ - 'Fatal read error on socket transport', - 'Fatal write error on socket transport', -] - - -def main_loop(): - '''Start the server.''' - if sys.version_info < (3, 5, 3): - raise RuntimeError('Python >= 3.5.3 is required to run ElectrumX') - - if os.geteuid() == 0: - raise RuntimeError('DO NOT RUN AS ROOT! Create an unprivileged user ' - 'account and use that') - - loop = asyncio.get_event_loop() - # loop.set_debug(True) - - def on_signal(signame): - '''Call on receipt of a signal to cleanly shutdown.''' - logging.warning('received {} signal, initiating shutdown' - .format(signame)) - controller.initiate_shutdown() - - def on_exception(loop, context): - '''Suppress spurious messages it appears we cannot control.''' - message = context.get('message') - if message not in SUPPRESS_MESSAGES: - if not ('task' in context and - 'accept_connection2()' in repr(context.get('task'))): - loop.default_exception_handler(context) - - controller = Controller(Env()) - future = asyncio.ensure_future(controller.main_loop()) - - # Install signal handlers - for signame in ('SIGINT', 'SIGTERM'): - loop.add_signal_handler(getattr(signal, signame), - partial(on_signal, signame)) - - # Install exception handler - loop.set_exception_handler(on_exception) - loop.run_until_complete(future) - loop.close() - - def main(): - '''Set up logging, enter main loop.''' + '''Set up logging and run the server.''' logging.basicConfig(level=logging.INFO) logging.info('ElectrumX server starting') try: - main_loop() + controller = Controller(Env()) + controller.run() except Exception: traceback.print_exc() logging.critical('ElectrumX server terminated abnormally') diff --git a/lib/socks.py b/lib/socks.py index 55d5715..961781e 100644 --- a/lib/socks.py +++ b/lib/socks.py @@ -136,7 +136,7 @@ class Socks(util.LoggedClass): class SocksProxy(util.LoggedClass): - def __init__(self, host, port, loop=None): + def __init__(self, host, port, loop): '''Host can be an IPv4 address, IPv6 address, or a host name. Port can be None, in which case one is auto-detected.''' super().__init__() @@ -147,7 +147,7 @@ class SocksProxy(util.LoggedClass): self.ip_addr = None self.lost_event = asyncio.Event() self.tried_event = asyncio.Event() - self.loop = loop or asyncio.get_event_loop() + self.loop = loop self.set_lost() async def auto_detect_loop(self): diff --git a/server/controller.py b/server/controller.py index f48b661..ed331d5 100644 --- a/server/controller.py +++ b/server/controller.py @@ -8,7 +8,9 @@ import asyncio import json import os +import signal import ssl +import sys import time import traceback from bisect import bisect_left @@ -37,12 +39,29 @@ class Controller(util.LoggedClass): BANDS = 5 CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) + SUPPRESS_MESSAGES = [ + 'Fatal read error on socket transport', + 'Fatal write error on socket transport', + ] def __init__(self, env): super().__init__() + + # Sanity checks + if sys.version_info < (3, 5, 3): + raise RuntimeError('Python >= 3.5.3 is required to run ElectrumX') + + if os.geteuid() == 0: + raise RuntimeError('DO NOT RUN AS ROOT! Create an unprivileged ' + 'user account and use that') + + # Set the event loop policy before doing anything asyncio + self.logger.info('event loop policy: {}'.format(env.loop_policy)) + asyncio.set_event_loop_policy(env.loop_policy) + self.loop = asyncio.get_event_loop() + # Set this event to cleanly shutdown self.shutdown_event = asyncio.Event() - self.loop = asyncio.get_event_loop() self.executor = ThreadPoolExecutor() self.loop.set_default_executor(self.executor) self.start_time = time.time() @@ -872,3 +891,32 @@ class Controller(util.LoggedClass): def donation_address(self): '''Return the donation address as a string, empty if there is none.''' return self.env.donation_address + + # Signal, exception handlers. + + def on_signal(self, signame): + '''Call on receipt of a signal to cleanly shutdown.''' + self.logger.warning('received {} signal, initiating shutdown' + .format(signame)) + self.initiate_shutdown() + + def on_exception(self, loop, context): + '''Suppress spurious messages it appears we cannot control.''' + message = context.get('message') + if message not in self.SUPPRESS_MESSAGES: + if not ('task' in context and + 'accept_connection2()' in repr(context.get('task'))): + loop.default_exception_handler(context) + + def run(self): + # Install signal handlers and exception handler + loop = self.loop + for signame in ('SIGINT', 'SIGTERM'): + loop.add_signal_handler(getattr(signal, signame), + partial(self.on_signal, signame)) + loop.set_exception_handler(self.on_exception) + + # Run the main loop to completion + future = asyncio.ensure_future(self.main_loop()) + loop.run_until_complete(future) + loop.close() diff --git a/server/env.py b/server/env.py index 4f5f671..a002549 100644 --- a/server/env.py +++ b/server/env.py @@ -66,6 +66,7 @@ class Env(lib_util.LoggedClass): self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000) self.session_timeout = self.integer('SESSION_TIMEOUT', 600) + self.loop_policy = self.event_loop_policy() # IRC self.irc = self.boolean('IRC', False) self.irc_nick = self.default('IRC_NICK', None) @@ -173,3 +174,12 @@ class Env(lib_util.LoggedClass): ssl_port, '_tor', ) + + def event_loop_policy(self): + policy = self.default('EVENT_LOOP_POLICY', None) + if policy is None: + return None + if policy == 'uvloop': + import uvloop + return uvloop.EventLoopPolicy() + raise self.Error('unknown event loop policy "{}"'.format(policy)) diff --git a/tests/server/test_env.py b/tests/server/test_env.py index 4a7f944..da4923f 100644 --- a/tests/server/test_env.py +++ b/tests/server/test_env.py @@ -158,6 +158,19 @@ def test_BANNER_FILE(): assert e.banner_file == 'banner_file' assert e.tor_banner_file == 'tor_banner_file' +def test_EVENT_LOOP_POLICY(): + e = Env() + assert e.loop_policy is None + os.environ['EVENT_LOOP_POLICY'] = 'foo' + with pytest.raises(Env.Error): + Env() + os.environ['EVENT_LOOP_POLICY'] = 'uvloop' + try: + Env() + except ImportError: + pass + del os.environ['EVENT_LOOP_POLICY'] + def test_ANON_LOGS(): assert_boolean('ANON_LOGS', 'anon_logs', False)