Clean up startup procedure

This commit is contained in:
Neil Booth 2016-11-09 00:47:49 +09:00
parent a510603b46
commit ed44c6ab73
3 changed files with 17 additions and 34 deletions

View File

@ -15,13 +15,13 @@ The components of the server are roughly like this::
- ElectrumX -<<<<<- LocalRPC - - ElectrumX -<<<<<- LocalRPC -
------------- ------------ ------------- ------------
< > < >
---------- ------------------- ---------- ---------- ------------------- --------------
- Daemon -<<<<<<<<- Block processor ->>>>- Caches - - Daemon -<<<<<<<<- Block processor ->>>>- UTXO Cache -
---------- ------------------- ---------- ---------- ------------------- --------------
< < > < < < > <
-------------- ----------- -------------- ----------------
- Prefetcher - - Storage - - Prefetcher - - FS + Storage -
-------------- ----------- -------------- ----------------
Env Env

View File

@ -20,20 +20,6 @@ from server.env import Env
from server.protocol import BlockServer from server.protocol import BlockServer
def close_loop(loop):
'''Close the loop down cleanly. Cancel and collect remaining tasks.'''
tasks = asyncio.Task.all_tasks()
for task in tasks:
task.cancel()
try:
loop.run_until_complete(asyncio.gather(*tasks))
except asyncio.CancelledError:
pass
loop.close()
def main_loop(): def main_loop():
'''Start the server.''' '''Start the server.'''
if os.geteuid() == 0: if os.geteuid() == 0:
@ -45,9 +31,9 @@ def main_loop():
def on_signal(signame): def on_signal(signame):
'''Call on receipt of a signal to cleanly shutdown.''' '''Call on receipt of a signal to cleanly shutdown.'''
logging.warning('received {} signal, preparing to shut down' logging.warning('received {} signal, shutting down'.format(signame))
.format(signame)) for task in asyncio.Task.all_tasks():
loop.stop() task.cancel()
# Install signal handlers # Install signal handlers
for signame in ('SIGINT', 'SIGTERM'): for signame in ('SIGINT', 'SIGTERM'):
@ -55,12 +41,14 @@ def main_loop():
partial(on_signal, signame)) partial(on_signal, signame))
server = BlockServer(Env()) server = BlockServer(Env())
server.start() future = server.start()
try: try:
loop.run_forever() loop.run_until_complete(future)
except asyncio.CancelledError:
pass
finally: finally:
server.stop() server.stop()
close_loop(loop) loop.close()
def main(): def main():

View File

@ -77,10 +77,6 @@ class Prefetcher(LoggedClass):
else: else:
return blocks, None return blocks, None
def start(self):
'''Start the prefetcher.'''
asyncio.ensure_future(self.main_loop())
async def main_loop(self): async def main_loop(self):
'''Loop forever polling for more blocks.''' '''Loop forever polling for more blocks.'''
self.logger.info('starting daemon poll loop...') self.logger.info('starting daemon poll loop...')
@ -179,7 +175,6 @@ class MemPool(LoggedClass):
''' '''
hex_hashes = set(hex_hashes) hex_hashes = set(hex_hashes)
touched = set() touched = set()
initial = self.count < 0 initial = self.count < 0
if initial: if initial:
self.logger.info('beginning import of {:,d} mempool txs' self.logger.info('beginning import of {:,d} mempool txs'
@ -357,9 +352,9 @@ class BlockProcessor(server.db.DB):
self.clean_db() self.clean_db()
def start(self): def start(self):
'''Start the block processor.''' '''Returns a future that starts the block processor when awaited.'''
asyncio.ensure_future(self.main_loop()) return asyncio.gather(self.main_loop(),
self.prefetcher.start() self.prefetcher.main_loop())
async def main_loop(self): async def main_loop(self):
'''Main loop for block processing. '''Main loop for block processing.