The controller is dead!

This commit is contained in:
Neil Booth 2016-11-08 08:29:56 +09:00
parent 2b028cc065
commit 93d53bdd87
5 changed files with 42 additions and 87 deletions

View File

@ -90,10 +90,3 @@ IRC
Not currently imlpemented; will handle IRC communication for the
ElectrumX servers.
Controller
----------
A historical artefact that currently coordinates some of the above
components. Not pictured as it is doesn't seem to have a logical
place and so is probably going away.

View File

@ -17,11 +17,11 @@ import traceback
from functools import partial
from server.env import Env
from server.controller import Controller
from server.block_processor import BlockServer
def cancel_tasks(loop):
# Cancel and collect the remaining tasks
def close_loop(loop):
'''Close the loop down cleanly. Cancel and collect remaining tasks.'''
tasks = asyncio.Task.all_tasks()
for task in tasks:
task.cancel()
@ -31,41 +31,36 @@ def cancel_tasks(loop):
except asyncio.CancelledError:
pass
loop.close()
def main_loop():
'''Get tasks; loop until complete.'''
'''Start the server.'''
if os.geteuid() == 0:
raise Exception('DO NOT RUN AS ROOT! Create an unpriveleged user '
'account and use that')
env = Env()
logging.info('switching current directory to {}'.format(env.db_dir))
os.chdir(env.db_dir)
loop = asyncio.get_event_loop()
#loop.set_debug(True)
controller = Controller(loop, env)
# Signal handlers
def on_signal(signame):
'''Call on receipt of a signal to cleanly shutdown.'''
logging.warning('received {} signal, preparing to shut down'
.format(signame))
loop.stop()
# Install signal handlers
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
partial(on_signal, signame))
controller.start()
server = BlockServer(Env())
server.start()
try:
loop.run_forever()
finally:
controller.stop()
cancel_tasks(loop)
loop.close()
server.stop()
close_loop(loop)
def main():

View File

@ -19,7 +19,7 @@ from collections import defaultdict, namedtuple
from functools import partial
from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY
from server.daemon import DaemonError
from server.daemon import Daemon, DaemonError
from server.protocol import ElectrumX, LocalRPC, JSONRPC
from lib.hash import hash_to_str
from lib.tx import Deserializer
@ -78,7 +78,11 @@ class Prefetcher(LoggedClass):
else:
return blocks, None
async def start(self):
def start(self):
'''Start the prefetcher.'''
asyncio.ensure_future(self.main_loop())
async def main_loop(self):
'''Loop forever polling for more blocks.'''
self.logger.info('starting daemon poll loop...')
while True:
@ -290,14 +294,13 @@ class BlockProcessor(server.db.DB):
Coordinate backing up in case of chain reorganisations.
'''
def __init__(self, env, daemon):
def __init__(self, env):
'''on_update is awaitable, and called only when caught up with the
daemon and a new block arrives or the mempool is updated.'''
super().__init__(env.coin, env.db_engine)
daemon.debug_set_height(self.height)
super().__init__(env)
self.env = env
self.daemon = daemon
self.daemon = Daemon(env.daemon_url, env.debug)
self.daemon.debug_set_height(self.height)
self.mempool = MemPool(self)
self.touched = set()
@ -310,7 +313,7 @@ class BlockProcessor(server.db.DB):
# Headers and tx_hashes have one entry per block
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
self.prefetcher = Prefetcher(daemon, self.height)
self.prefetcher = Prefetcher(self.daemon, self.height)
self.last_flush = time.time()
self.last_flush_tx_count = self.tx_count
@ -332,11 +335,13 @@ class BlockProcessor(server.db.DB):
self.clean_db()
def coros(self):
return [self.start(), self.prefetcher.start()]
def start(self):
'''Start the block processor.'''
asyncio.ensure_future(self.main_loop())
self.prefetcher.start()
async def start(self):
'''External entry point for block processing.
async def main_loop(self):
'''Main loop for block processing.
Safely flushes the DB on clean shutdown.
'''
@ -808,10 +813,10 @@ class BlockProcessor(server.db.DB):
class BlockServer(BlockProcessor):
'''Like BlockProcessor but also starts servers when caught up.'''
def __init__(self, env, daemon):
def __init__(self, env):
'''on_update is awaitable, and called only when caught up with the
daemon and a new block arrives or the mempool is updated.'''
super().__init__(env, daemon)
super().__init__(env)
self.servers = []
async def caught_up(self, mempool_hashes):

View File

@ -1,44 +0,0 @@
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Server controller.
Coordinates the parts of the server. Serves as a cache for
client-serving data such as histories.
'''
import asyncio
from server.daemon import Daemon
from server.block_processor import BlockServer
from lib.util import LoggedClass
class Controller(LoggedClass):
def __init__(self, loop, env):
'''Create up the controller.
Creates DB, Daemon and BlockProcessor instances.
'''
super().__init__()
self.loop = loop
self.env = env
self.coin = env.coin
self.daemon = Daemon(env.daemon_url, env.debug)
self.block_processor = BlockServer(env, self.daemon)
def start(self):
'''Prime the event loop with asynchronous jobs.'''
coros = self.block_processor.coros()
for coro in coros:
asyncio.ensure_future(coro)
def stop(self):
'''Close the listening servers.'''
self.block_processor.stop()

View File

@ -9,6 +9,7 @@
import array
import ast
import os
import struct
from collections import namedtuple
@ -26,19 +27,24 @@ class DB(LoggedClass):
it was shutdown uncleanly.
'''
def __init__(self, coin, db_engine):
def __init__(self, env):
super().__init__()
self.coin = coin
self.env = env
self.coin = env.coin
self.logger.info('switching current directory to {}'
.format(env.db_dir))
os.chdir(env.db_dir)
# Open DB and metadata files. Record some of its state.
db_name = '{}-{}'.format(coin.NAME, coin.NET)
self.db = open_db(db_name, db_engine)
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
self.db = open_db(db_name, env.db_engine)
if self.db.is_new:
self.logger.info('created new {} database {}'
.format(db_engine, db_name))
.format(env.db_engine, db_name))
else:
self.logger.info('successfully opened {} database {}'
.format(db_engine, db_name))
.format(env.db_engine, db_name))
self.init_state_from_db()
self.tx_count = self.db_tx_count