Add EVENT_LOOP_POLICY environment variable
Based on #215 by JustinTArthur. - Accept EVENT_LOOP_POLICY of uvloop - Move initilisation from electrumx_server to controller - SocksProxy now requires a loop - Update tests
This commit is contained in:
parent
1f3e942cbc
commit
137236712f
@ -105,6 +105,14 @@ These environment variables are optional:
|
|||||||
to blank. The default is appropriate for **COIN** and **NET**
|
to blank. The default is appropriate for **COIN** and **NET**
|
||||||
(e.g., 8000 for Bitcoin mainnet) if not set.
|
(e.g., 8000 for Bitcoin mainnet) if not set.
|
||||||
|
|
||||||
|
* **EVENT_LOOP_POLICY**
|
||||||
|
|
||||||
|
The name of an event loop policy to replace the default asyncio
|
||||||
|
policy, if any. At present only `uvloop` is accepted, in which case
|
||||||
|
you must have installed the `uvloop`_ Python package.
|
||||||
|
|
||||||
|
If you are not sure what this means leave it unset.
|
||||||
|
|
||||||
* **DONATION_ADDRESS**
|
* **DONATION_ADDRESS**
|
||||||
|
|
||||||
The server donation address reported to Electrum clients. Defaults
|
The server donation address reported to Electrum clients. Defaults
|
||||||
@ -372,3 +380,4 @@ your available physical RAM:
|
|||||||
variables is roughly equivalent.
|
variables is roughly equivalent.
|
||||||
|
|
||||||
.. _lib/coins.py: https://github.com/kyuupichan/electrumx/blob/master/lib/coins.py
|
.. _lib/coins.py: https://github.com/kyuupichan/electrumx/blob/master/lib/coins.py
|
||||||
|
.. _uvloop: https://pypi.python.org/pypi/uvloop
|
||||||
|
|||||||
@ -9,70 +9,20 @@
|
|||||||
|
|
||||||
'''Script to kick off the server.'''
|
'''Script to kick off the server.'''
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
import signal
|
|
||||||
import sys
|
|
||||||
import traceback
|
import traceback
|
||||||
from functools import partial
|
|
||||||
|
|
||||||
from server.env import Env
|
from server.env import Env
|
||||||
from server.controller import Controller
|
from server.controller import Controller
|
||||||
|
|
||||||
|
|
||||||
SUPPRESS_MESSAGES = [
|
|
||||||
'Fatal read error on socket transport',
|
|
||||||
'Fatal write error on socket transport',
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def main_loop():
|
|
||||||
'''Start the server.'''
|
|
||||||
if sys.version_info < (3, 5, 3):
|
|
||||||
raise RuntimeError('Python >= 3.5.3 is required to run ElectrumX')
|
|
||||||
|
|
||||||
if os.geteuid() == 0:
|
|
||||||
raise RuntimeError('DO NOT RUN AS ROOT! Create an unprivileged user '
|
|
||||||
'account and use that')
|
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
# loop.set_debug(True)
|
|
||||||
|
|
||||||
def on_signal(signame):
|
|
||||||
'''Call on receipt of a signal to cleanly shutdown.'''
|
|
||||||
logging.warning('received {} signal, initiating shutdown'
|
|
||||||
.format(signame))
|
|
||||||
controller.initiate_shutdown()
|
|
||||||
|
|
||||||
def on_exception(loop, context):
|
|
||||||
'''Suppress spurious messages it appears we cannot control.'''
|
|
||||||
message = context.get('message')
|
|
||||||
if message not in SUPPRESS_MESSAGES:
|
|
||||||
if not ('task' in context and
|
|
||||||
'accept_connection2()' in repr(context.get('task'))):
|
|
||||||
loop.default_exception_handler(context)
|
|
||||||
|
|
||||||
controller = Controller(Env())
|
|
||||||
future = asyncio.ensure_future(controller.main_loop())
|
|
||||||
|
|
||||||
# Install signal handlers
|
|
||||||
for signame in ('SIGINT', 'SIGTERM'):
|
|
||||||
loop.add_signal_handler(getattr(signal, signame),
|
|
||||||
partial(on_signal, signame))
|
|
||||||
|
|
||||||
# Install exception handler
|
|
||||||
loop.set_exception_handler(on_exception)
|
|
||||||
loop.run_until_complete(future)
|
|
||||||
loop.close()
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
'''Set up logging, enter main loop.'''
|
'''Set up logging and run the server.'''
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
logging.info('ElectrumX server starting')
|
logging.info('ElectrumX server starting')
|
||||||
try:
|
try:
|
||||||
main_loop()
|
controller = Controller(Env())
|
||||||
|
controller.run()
|
||||||
except Exception:
|
except Exception:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
logging.critical('ElectrumX server terminated abnormally')
|
logging.critical('ElectrumX server terminated abnormally')
|
||||||
|
|||||||
@ -136,7 +136,7 @@ class Socks(util.LoggedClass):
|
|||||||
|
|
||||||
class SocksProxy(util.LoggedClass):
|
class SocksProxy(util.LoggedClass):
|
||||||
|
|
||||||
def __init__(self, host, port, loop=None):
|
def __init__(self, host, port, loop):
|
||||||
'''Host can be an IPv4 address, IPv6 address, or a host name.
|
'''Host can be an IPv4 address, IPv6 address, or a host name.
|
||||||
Port can be None, in which case one is auto-detected.'''
|
Port can be None, in which case one is auto-detected.'''
|
||||||
super().__init__()
|
super().__init__()
|
||||||
@ -147,7 +147,7 @@ class SocksProxy(util.LoggedClass):
|
|||||||
self.ip_addr = None
|
self.ip_addr = None
|
||||||
self.lost_event = asyncio.Event()
|
self.lost_event = asyncio.Event()
|
||||||
self.tried_event = asyncio.Event()
|
self.tried_event = asyncio.Event()
|
||||||
self.loop = loop or asyncio.get_event_loop()
|
self.loop = loop
|
||||||
self.set_lost()
|
self.set_lost()
|
||||||
|
|
||||||
async def auto_detect_loop(self):
|
async def auto_detect_loop(self):
|
||||||
|
|||||||
@ -8,7 +8,9 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
import signal
|
||||||
import ssl
|
import ssl
|
||||||
|
import sys
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
from bisect import bisect_left
|
from bisect import bisect_left
|
||||||
@ -37,12 +39,29 @@ class Controller(util.LoggedClass):
|
|||||||
|
|
||||||
BANDS = 5
|
BANDS = 5
|
||||||
CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4)
|
CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4)
|
||||||
|
SUPPRESS_MESSAGES = [
|
||||||
|
'Fatal read error on socket transport',
|
||||||
|
'Fatal write error on socket transport',
|
||||||
|
]
|
||||||
|
|
||||||
def __init__(self, env):
|
def __init__(self, env):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
|
# Sanity checks
|
||||||
|
if sys.version_info < (3, 5, 3):
|
||||||
|
raise RuntimeError('Python >= 3.5.3 is required to run ElectrumX')
|
||||||
|
|
||||||
|
if os.geteuid() == 0:
|
||||||
|
raise RuntimeError('DO NOT RUN AS ROOT! Create an unprivileged '
|
||||||
|
'user account and use that')
|
||||||
|
|
||||||
|
# Set the event loop policy before doing anything asyncio
|
||||||
|
self.logger.info('event loop policy: {}'.format(env.loop_policy))
|
||||||
|
asyncio.set_event_loop_policy(env.loop_policy)
|
||||||
|
self.loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
# Set this event to cleanly shutdown
|
# Set this event to cleanly shutdown
|
||||||
self.shutdown_event = asyncio.Event()
|
self.shutdown_event = asyncio.Event()
|
||||||
self.loop = asyncio.get_event_loop()
|
|
||||||
self.executor = ThreadPoolExecutor()
|
self.executor = ThreadPoolExecutor()
|
||||||
self.loop.set_default_executor(self.executor)
|
self.loop.set_default_executor(self.executor)
|
||||||
self.start_time = time.time()
|
self.start_time = time.time()
|
||||||
@ -872,3 +891,32 @@ class Controller(util.LoggedClass):
|
|||||||
def donation_address(self):
|
def donation_address(self):
|
||||||
'''Return the donation address as a string, empty if there is none.'''
|
'''Return the donation address as a string, empty if there is none.'''
|
||||||
return self.env.donation_address
|
return self.env.donation_address
|
||||||
|
|
||||||
|
# Signal, exception handlers.
|
||||||
|
|
||||||
|
def on_signal(self, signame):
|
||||||
|
'''Call on receipt of a signal to cleanly shutdown.'''
|
||||||
|
self.logger.warning('received {} signal, initiating shutdown'
|
||||||
|
.format(signame))
|
||||||
|
self.initiate_shutdown()
|
||||||
|
|
||||||
|
def on_exception(self, loop, context):
|
||||||
|
'''Suppress spurious messages it appears we cannot control.'''
|
||||||
|
message = context.get('message')
|
||||||
|
if message not in self.SUPPRESS_MESSAGES:
|
||||||
|
if not ('task' in context and
|
||||||
|
'accept_connection2()' in repr(context.get('task'))):
|
||||||
|
loop.default_exception_handler(context)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
# Install signal handlers and exception handler
|
||||||
|
loop = self.loop
|
||||||
|
for signame in ('SIGINT', 'SIGTERM'):
|
||||||
|
loop.add_signal_handler(getattr(signal, signame),
|
||||||
|
partial(self.on_signal, signame))
|
||||||
|
loop.set_exception_handler(self.on_exception)
|
||||||
|
|
||||||
|
# Run the main loop to completion
|
||||||
|
future = asyncio.ensure_future(self.main_loop())
|
||||||
|
loop.run_until_complete(future)
|
||||||
|
loop.close()
|
||||||
|
|||||||
@ -66,6 +66,7 @@ class Env(lib_util.LoggedClass):
|
|||||||
self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
|
self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
|
||||||
self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000)
|
self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000)
|
||||||
self.session_timeout = self.integer('SESSION_TIMEOUT', 600)
|
self.session_timeout = self.integer('SESSION_TIMEOUT', 600)
|
||||||
|
self.loop_policy = self.event_loop_policy()
|
||||||
# IRC
|
# IRC
|
||||||
self.irc = self.boolean('IRC', False)
|
self.irc = self.boolean('IRC', False)
|
||||||
self.irc_nick = self.default('IRC_NICK', None)
|
self.irc_nick = self.default('IRC_NICK', None)
|
||||||
@ -173,3 +174,12 @@ class Env(lib_util.LoggedClass):
|
|||||||
ssl_port,
|
ssl_port,
|
||||||
'_tor',
|
'_tor',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def event_loop_policy(self):
|
||||||
|
policy = self.default('EVENT_LOOP_POLICY', None)
|
||||||
|
if policy is None:
|
||||||
|
return None
|
||||||
|
if policy == 'uvloop':
|
||||||
|
import uvloop
|
||||||
|
return uvloop.EventLoopPolicy()
|
||||||
|
raise self.Error('unknown event loop policy "{}"'.format(policy))
|
||||||
|
|||||||
@ -158,6 +158,19 @@ def test_BANNER_FILE():
|
|||||||
assert e.banner_file == 'banner_file'
|
assert e.banner_file == 'banner_file'
|
||||||
assert e.tor_banner_file == 'tor_banner_file'
|
assert e.tor_banner_file == 'tor_banner_file'
|
||||||
|
|
||||||
|
def test_EVENT_LOOP_POLICY():
|
||||||
|
e = Env()
|
||||||
|
assert e.loop_policy is None
|
||||||
|
os.environ['EVENT_LOOP_POLICY'] = 'foo'
|
||||||
|
with pytest.raises(Env.Error):
|
||||||
|
Env()
|
||||||
|
os.environ['EVENT_LOOP_POLICY'] = 'uvloop'
|
||||||
|
try:
|
||||||
|
Env()
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
del os.environ['EVENT_LOOP_POLICY']
|
||||||
|
|
||||||
def test_ANON_LOGS():
|
def test_ANON_LOGS():
|
||||||
assert_boolean('ANON_LOGS', 'anon_logs', False)
|
assert_boolean('ANON_LOGS', 'anon_logs', False)
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user