diff --git a/docs/ARCHITECTURE.rst b/docs/ARCHITECTURE.rst index 3bd8f5e..98c43c4 100644 --- a/docs/ARCHITECTURE.rst +++ b/docs/ARCHITECTURE.rst @@ -15,13 +15,13 @@ The components of the server are roughly like this:: - ElectrumX -<<<<<- LocalRPC - ------------- ------------ < > - ---------- ------------------- ---------- - - Daemon -<<<<<<<<- Block processor ->>>>- Caches - - ---------- ------------------- ---------- + ---------- ------------------- -------------- + - Daemon -<<<<<<<<- Block processor ->>>>- UTXO Cache - + ---------- ------------------- -------------- < < > < - -------------- ----------- - - Prefetcher - - Storage - - -------------- ----------- + -------------- ---------------- + - Prefetcher - - FS + Storage - + -------------- ---------------- Env diff --git a/electrumx_server.py b/electrumx_server.py index 6a817f8..939bd2e 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -20,20 +20,6 @@ from server.env import Env from server.protocol import BlockServer -def close_loop(loop): - '''Close the loop down cleanly. Cancel and collect remaining tasks.''' - tasks = asyncio.Task.all_tasks() - for task in tasks: - task.cancel() - - try: - loop.run_until_complete(asyncio.gather(*tasks)) - except asyncio.CancelledError: - pass - - loop.close() - - def main_loop(): '''Start the server.''' if os.geteuid() == 0: @@ -45,9 +31,9 @@ def main_loop(): def on_signal(signame): '''Call on receipt of a signal to cleanly shutdown.''' - logging.warning('received {} signal, preparing to shut down' - .format(signame)) - loop.stop() + logging.warning('received {} signal, shutting down'.format(signame)) + for task in asyncio.Task.all_tasks(): + task.cancel() # Install signal handlers for signame in ('SIGINT', 'SIGTERM'): @@ -55,12 +41,14 @@ def main_loop(): partial(on_signal, signame)) server = BlockServer(Env()) - server.start() + future = server.start() try: - loop.run_forever() + loop.run_until_complete(future) + except asyncio.CancelledError: + pass finally: server.stop() - close_loop(loop) + loop.close() def main(): diff --git a/server/block_processor.py b/server/block_processor.py index c995c69..b44a33e 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -77,10 +77,6 @@ class Prefetcher(LoggedClass): else: return blocks, None - def start(self): - '''Start the prefetcher.''' - asyncio.ensure_future(self.main_loop()) - async def main_loop(self): '''Loop forever polling for more blocks.''' self.logger.info('starting daemon poll loop...') @@ -179,7 +175,6 @@ class MemPool(LoggedClass): ''' hex_hashes = set(hex_hashes) touched = set() - initial = self.count < 0 if initial: self.logger.info('beginning import of {:,d} mempool txs' @@ -357,9 +352,9 @@ class BlockProcessor(server.db.DB): self.clean_db() def start(self): - '''Start the block processor.''' - asyncio.ensure_future(self.main_loop()) - self.prefetcher.start() + '''Returns a future that starts the block processor when awaited.''' + return asyncio.gather(self.main_loop(), + self.prefetcher.main_loop()) async def main_loop(self): '''Main loop for block processing.