From 93d53bdd8772721f1eb4d35c4e99f88466f922a7 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 8 Nov 2016 08:29:56 +0900 Subject: [PATCH] The controller is dead! --- docs/ARCHITECTURE.rst | 7 ------- electrumx_server.py | 27 ++++++++++-------------- server/block_processor.py | 33 ++++++++++++++++------------- server/controller.py | 44 --------------------------------------- server/db.py | 18 ++++++++++------ 5 files changed, 42 insertions(+), 87 deletions(-) delete mode 100644 server/controller.py diff --git a/docs/ARCHITECTURE.rst b/docs/ARCHITECTURE.rst index 3af6c99..3bd8f5e 100644 --- a/docs/ARCHITECTURE.rst +++ b/docs/ARCHITECTURE.rst @@ -90,10 +90,3 @@ IRC Not currently imlpemented; will handle IRC communication for the ElectrumX servers. - -Controller ----------- - -A historical artefact that currently coordinates some of the above -components. Not pictured as it is doesn't seem to have a logical -place and so is probably going away. diff --git a/electrumx_server.py b/electrumx_server.py index 9429883..94d65c8 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -17,11 +17,11 @@ import traceback from functools import partial from server.env import Env -from server.controller import Controller +from server.block_processor import BlockServer -def cancel_tasks(loop): - # Cancel and collect the remaining tasks +def close_loop(loop): + '''Close the loop down cleanly. Cancel and collect remaining tasks.''' tasks = asyncio.Task.all_tasks() for task in tasks: task.cancel() @@ -31,41 +31,36 @@ def cancel_tasks(loop): except asyncio.CancelledError: pass + loop.close() + def main_loop(): - '''Get tasks; loop until complete.''' + '''Start the server.''' if os.geteuid() == 0: raise Exception('DO NOT RUN AS ROOT! Create an unpriveleged user ' 'account and use that') - env = Env() - logging.info('switching current directory to {}'.format(env.db_dir)) - os.chdir(env.db_dir) - loop = asyncio.get_event_loop() #loop.set_debug(True) - controller = Controller(loop, env) - - # Signal handlers def on_signal(signame): '''Call on receipt of a signal to cleanly shutdown.''' logging.warning('received {} signal, preparing to shut down' .format(signame)) loop.stop() + # Install signal handlers for signame in ('SIGINT', 'SIGTERM'): loop.add_signal_handler(getattr(signal, signame), partial(on_signal, signame)) - controller.start() + server = BlockServer(Env()) + server.start() try: loop.run_forever() finally: - controller.stop() - cancel_tasks(loop) - - loop.close() + server.stop() + close_loop(loop) def main(): diff --git a/server/block_processor.py b/server/block_processor.py index c58284a..0309491 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -19,7 +19,7 @@ from collections import defaultdict, namedtuple from functools import partial from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY -from server.daemon import DaemonError +from server.daemon import Daemon, DaemonError from server.protocol import ElectrumX, LocalRPC, JSONRPC from lib.hash import hash_to_str from lib.tx import Deserializer @@ -78,7 +78,11 @@ class Prefetcher(LoggedClass): else: return blocks, None - async def start(self): + def start(self): + '''Start the prefetcher.''' + asyncio.ensure_future(self.main_loop()) + + async def main_loop(self): '''Loop forever polling for more blocks.''' self.logger.info('starting daemon poll loop...') while True: @@ -290,14 +294,13 @@ class BlockProcessor(server.db.DB): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, daemon): + def __init__(self, env): '''on_update is awaitable, and called only when caught up with the daemon and a new block arrives or the mempool is updated.''' - super().__init__(env.coin, env.db_engine) - daemon.debug_set_height(self.height) + super().__init__(env) - self.env = env - self.daemon = daemon + self.daemon = Daemon(env.daemon_url, env.debug) + self.daemon.debug_set_height(self.height) self.mempool = MemPool(self) self.touched = set() @@ -310,7 +313,7 @@ class BlockProcessor(server.db.DB): # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - self.prefetcher = Prefetcher(daemon, self.height) + self.prefetcher = Prefetcher(self.daemon, self.height) self.last_flush = time.time() self.last_flush_tx_count = self.tx_count @@ -332,11 +335,13 @@ class BlockProcessor(server.db.DB): self.clean_db() - def coros(self): - return [self.start(), self.prefetcher.start()] + def start(self): + '''Start the block processor.''' + asyncio.ensure_future(self.main_loop()) + self.prefetcher.start() - async def start(self): - '''External entry point for block processing. + async def main_loop(self): + '''Main loop for block processing. Safely flushes the DB on clean shutdown. ''' @@ -808,10 +813,10 @@ class BlockProcessor(server.db.DB): class BlockServer(BlockProcessor): '''Like BlockProcessor but also starts servers when caught up.''' - def __init__(self, env, daemon): + def __init__(self, env): '''on_update is awaitable, and called only when caught up with the daemon and a new block arrives or the mempool is updated.''' - super().__init__(env, daemon) + super().__init__(env) self.servers = [] async def caught_up(self, mempool_hashes): diff --git a/server/controller.py b/server/controller.py deleted file mode 100644 index dffae97..0000000 --- a/server/controller.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright (c) 2016, Neil Booth -# -# All rights reserved. -# -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. - -'''Server controller. - -Coordinates the parts of the server. Serves as a cache for -client-serving data such as histories. -''' - -import asyncio - -from server.daemon import Daemon -from server.block_processor import BlockServer -from lib.util import LoggedClass - - -class Controller(LoggedClass): - - def __init__(self, loop, env): - '''Create up the controller. - - Creates DB, Daemon and BlockProcessor instances. - ''' - super().__init__() - self.loop = loop - self.env = env - self.coin = env.coin - self.daemon = Daemon(env.daemon_url, env.debug) - self.block_processor = BlockServer(env, self.daemon) - - def start(self): - '''Prime the event loop with asynchronous jobs.''' - coros = self.block_processor.coros() - - for coro in coros: - asyncio.ensure_future(coro) - - def stop(self): - '''Close the listening servers.''' - self.block_processor.stop() diff --git a/server/db.py b/server/db.py index f253213..0256ff4 100644 --- a/server/db.py +++ b/server/db.py @@ -9,6 +9,7 @@ import array import ast +import os import struct from collections import namedtuple @@ -26,19 +27,24 @@ class DB(LoggedClass): it was shutdown uncleanly. ''' - def __init__(self, coin, db_engine): + def __init__(self, env): super().__init__() - self.coin = coin + self.env = env + self.coin = env.coin + + self.logger.info('switching current directory to {}' + .format(env.db_dir)) + os.chdir(env.db_dir) # Open DB and metadata files. Record some of its state. - db_name = '{}-{}'.format(coin.NAME, coin.NET) - self.db = open_db(db_name, db_engine) + db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) + self.db = open_db(db_name, env.db_engine) if self.db.is_new: self.logger.info('created new {} database {}' - .format(db_engine, db_name)) + .format(env.db_engine, db_name)) else: self.logger.info('successfully opened {} database {}' - .format(db_engine, db_name)) + .format(env.db_engine, db_name)) self.init_state_from_db() self.tx_count = self.db_tx_count