From bab0d162dee7ab156a48d5d5c54649da8f0c00cc Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 23 Nov 2016 15:07:16 +0900 Subject: [PATCH] Merge ServerManager and BlockServer --- electrumx_server.py | 4 +- server/block_processor.py | 33 +++------ server/protocol.py | 142 ++++++++++++++++---------------------- 3 files changed, 73 insertions(+), 106 deletions(-) diff --git a/electrumx_server.py b/electrumx_server.py index d1ef476..940828e 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -17,7 +17,7 @@ import traceback from functools import partial from server.env import Env -from server.protocol import BlockServer +from server.protocol import ServerManager SUPPRESS_MESSAGES = [ 'Fatal read error on socket transport', @@ -45,7 +45,7 @@ def main_loop(): 'accept_connection2()' in repr(context.get('task'))): loop.default_exception_handler(context) - server = BlockServer(Env()) + server = ServerManager(Env()) future = asyncio.ensure_future(server.main_loop()) # Install signal handlers diff --git a/server/block_processor.py b/server/block_processor.py index f911e51..d529659 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -133,9 +133,11 @@ class BlockProcessor(server.db.DB): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env): + def __init__(self, client, env): super().__init__(env) + self.client = client + # These are our state as we move ahead of DB state self.fs_height = self.db_height self.fs_tx_count = self.db_tx_count @@ -146,7 +148,6 @@ class BlockProcessor(server.db.DB): self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url)) self.caught_up = False self.touched = set() - self.futures = [] # Meta self.utxo_MB = env.utxo_MB @@ -185,7 +186,7 @@ class BlockProcessor(server.db.DB): Safely flushes the DB on clean shutdown. ''' - self.futures.append(asyncio.ensure_future(self.prefetcher.main_loop())) + prefetcher_loop = asyncio.ensure_future(self.prefetcher.main_loop()) # Simulate a reorg if requested if self.env.force_reorg > 0: @@ -197,20 +198,11 @@ class BlockProcessor(server.db.DB): while True: await self._wait_for_update() except asyncio.CancelledError: - self.on_cancel() - await self.wait_shutdown() + pass - def on_cancel(self): - '''Called when the main loop is cancelled. - - Intended to be overridden in derived classes.''' - for future in self.futures: - future.cancel() + prefetcher_loop.cancel() self.flush(True) - - async def wait_shutdown(self): - '''Wait for shutdown to complete cleanly, and return.''' - await asyncio.sleep(0) + await self.client.shutdown() async def _wait_for_update(self): '''Wait for the prefetcher to deliver blocks. @@ -237,26 +229,21 @@ class BlockProcessor(server.db.DB): # Flush everything as queries are performed on the DB and # not in-memory. self.flush(True) - self.notify(self.touched) + self.client.notify(self.touched) elif time.time() > self.next_cache_check: self.check_cache_size() self.next_cache_check = time.time() + 60 self.touched = set() async def first_caught_up(self): - '''Called after each deamon poll if caught up.''' + '''Called when first caught up after start, or after a reorg.''' self.caught_up = True if self.first_sync: self.first_sync = False self.logger.info('{} synced to height {:,d}. DB version:' .format(VERSION, self.height, self.db_version)) self.flush(True) - - def notify(self, touched): - '''Called with list of touched addresses by new blocks. - - Only called for blocks found after first_caught_up is called. - Intended to be overridden in derived classes.''' + await self.client.first_caught_up() async def handle_chain_reorg(self, count): '''Handle a chain reorganisation. diff --git a/server/protocol.py b/server/protocol.py index 94261df..dc452e9 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -20,63 +20,14 @@ from functools import partial from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.jsonrpc import JSONRPC, json_notification_payload from lib.tx import Deserializer -from lib.util import LoggedClass +import lib.util as util from server.block_processor import BlockProcessor from server.daemon import DaemonError from server.irc import IRC from server.version import VERSION -class BlockServer(BlockProcessor): - '''Like BlockProcessor but also has a mempool and a server manager. - - Servers are started immediately the block processor first catches - up with the daemon. - ''' - - def __init__(self, env): - super().__init__(env) - self.server_mgr = ServerManager(self, env) - self.mempool = MemPool(self) - - async def first_caught_up(self): - # Call the base class to flush and log first - await super().first_caught_up() - await self.server_mgr.start_servers() - self.futures.append(self.mempool.start()) - - def notify(self, touched): - '''Called when addresses are touched by new blocks or mempool - updates.''' - self.server_mgr.notify(self.height, touched) - - def on_cancel(self): - '''Called when the main loop is cancelled.''' - self.server_mgr.stop() - super().on_cancel() - - async def wait_shutdown(self): - '''Wait for shutdown to complete cleanly, and return.''' - await self.server_mgr.wait_shutdown() - await super().wait_shutdown() - - def mempool_transactions(self, hash168): - '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool - entries for the hash168. - - unconfirmed is True if any txin is unconfirmed. - ''' - return self.mempool.transactions(hash168) - - def mempool_value(self, hash168): - '''Return the unconfirmed amount in the mempool for hash168. - - Can be positive or negative. - ''' - return self.mempool.value(hash168) - - -class MemPool(LoggedClass): +class MemPool(util.LoggedClass): '''Representation of the daemon's mempool. Updated regularly in caught-up state. Goal is to enable efficient @@ -91,11 +42,14 @@ class MemPool(LoggedClass): tx's txins are unconfirmed. tx hashes are hex strings. ''' - def __init__(self, bp): + def __init__(self, daemon, coin, db, manager): super().__init__() + self.daemon = daemon + self.coin = coin + self.db = db + self.manager = manager self.txs = {} self.hash168s = defaultdict(set) # None can be a key - self.bp = bp self.count = -1 def start(self): @@ -120,7 +74,7 @@ class MemPool(LoggedClass): Remove transactions that are no longer in our mempool. Request new transactions we don't have then add to our mempool. ''' - hex_hashes = set(await self.bp.daemon.mempool_hashes()) + hex_hashes = set(await self.daemon.mempool_hashes()) touched = set() missing_utxos = [] @@ -145,7 +99,7 @@ class MemPool(LoggedClass): # ones the daemon no longer has (it will return None). Put # them into a dictionary of hex hash to deserialized tx. hex_hashes.difference_update(self.txs) - raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes) + raw_txs = await self.daemon.getrawtransactions(hex_hashes) if initial: self.logger.info('analysing {:,d} mempool txs' .format(len(raw_txs))) @@ -155,8 +109,8 @@ class MemPool(LoggedClass): # The mempool is unordered, so process all outputs first so # that looking for inputs has full info. - script_hash168 = self.bp.coin.hash168_from_script() - db_utxo_lookup = self.bp.db_utxo_lookup + script_hash168 = self.coin.hash168_from_script() + db_utxo_lookup = self.db.db_utxo_lookup def txout_pair(txout): return (script_hash168(txout.pk_script), txout.value) @@ -195,7 +149,7 @@ class MemPool(LoggedClass): try: infos = (txin_info(txin) for txin in tx.inputs) txin_pairs, unconfs = zip(*infos) - except self.bp.MissingUTXOError: + except self.db.MissingUTXOError: # Drop this TX. If other mempool txs depend on it # it's harmless - next time the mempool is refreshed # they'll either be cleaned up or the UTXOs will no @@ -227,7 +181,7 @@ class MemPool(LoggedClass): self.logger.info('{:,d} txs touching {:,d} addresses' .format(len(self.txs), len(self.hash168s))) - self.bp.notify(touched) + self.maanger.notify(touched) def transactions(self, hash168): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool @@ -254,14 +208,19 @@ class MemPool(LoggedClass): return value -class ServerManager(LoggedClass): - '''Manages the servers.''' +class ServerManager(util.LoggedClass): + '''Manages the client servers, a mempool, and a block processor. + + Servers are started immediately the block processor first catches + up with the daemon. + ''' MgrTask = namedtuple('MgrTask', 'session task') - def __init__(self, bp, env): + def __init__(self, env): super().__init__() - self.bp = bp + self.bp = BlockProcessor(self, env) + self.mempool = MemPool(self.db.daemon, env.coin, self.bp, self) self.env = env self.servers = [] self.irc = IRC(env) @@ -269,11 +228,27 @@ class ServerManager(LoggedClass): self.max_subs = env.max_subs self.subscription_count = 0 self.irc_future = None + self.mempool_future = None self.logger.info('max subscriptions across all sessions: {:,d}' .format(self.max_subs)) self.logger.info('max subscriptions per session: {:,d}' .format(env.max_session_subs)) + def mempool_transactions(self, hash168): + '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool + entries for the hash168. + + unconfirmed is True if any txin is unconfirmed. + ''' + return self.mempool.transactions(hash168) + + def mempool_value(self, hash168): + '''Return the unconfirmed amount in the mempool for hash168. + + Can be positive or negative. + ''' + return self.mempool.value(hash168) + async def start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() protocol_class = LocalRPC if kind == 'RPC' else ElectrumX @@ -315,35 +290,40 @@ class ServerManager(LoggedClass): else: self.logger.info('IRC disabled') - def notify(self, height, touched): + async def first_caught_up(self): + if not self.mempool_future: + self.mempool_future = self.mempool.start() + await self.server_mgr.start_servers() + + def notify(self, touched): '''Notify sessions about height changes and touched addresses.''' cache = {} for session in self.sessions: if isinstance(session, ElectrumX): # Use a tuple to distinguish from JSON - session.jobs.put_nowait((height, touched, cache)) + session.jobs.put_nowait((self.bp.height, touched, cache)) - def stop(self): - '''Close listening servers.''' + async def shutdown(self): + '''Call to shutdown the servers. Returns when done.''' for server in self.servers: server.close() - if self.irc_future: - self.irc_future.cancel() - if self.sessions: - self.logger.info('cleanly closing client sessions, please wait...') - for session in self.sessions: - self.close_session(session) - - async def wait_shutdown(self): - # Wait for servers to close for server in self.servers: await server.wait_closed() self.servers = [] - secs = 60 + if self.irc_future: + self.irc_future.cancel() + if self.mempool_future: + self.mempool_future.cancel() + if self.sessions: + await self.close_sessions() + + async def close_sessions(self, secs=60): + self.logger.info('cleanly closing client sessions, please wait...') + for session in self.sessions: + self.close_session(session) self.logger.info('server listening sockets closed, waiting ' '{:d} seconds for socket cleanup'.format(secs)) - limit = time.time() + secs while self.sessions and time.time() < limit: await asyncio.sleep(4) @@ -628,7 +608,7 @@ class ElectrumX(Session): # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 history = await self.async_get_history(hash168) - mempool = self.bp.mempool_transactions(hash168) + mempool = self.manager.mempool_transactions(hash168) status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) for tx_hash, height in history) @@ -666,7 +646,7 @@ class ElectrumX(Session): def unconfirmed_history(self, hash168): # Note unconfirmed history is unordered in electrum-server # Height is -1 if unconfirmed txins, otherwise 0 - mempool = self.bp.mempool_transactions(hash168) + mempool = self.manager.mempool_transactions(hash168) return [{'tx_hash': tx_hash, 'height': -unconfirmed, 'fee': fee} for tx_hash, fee, unconfirmed in mempool] @@ -707,7 +687,7 @@ class ElectrumX(Session): async def get_balance(self, hash168): utxos = await self.get_utxos(hash168) confirmed = sum(utxo.value for utxo in utxos) - unconfirmed = self.bp.mempool_value(hash168) + unconfirmed = self.manager.mempool_value(hash168) return {'confirmed': confirmed, 'unconfirmed': unconfirmed} async def list_unspent(self, hash168):