From 391e69b66c86040619f1e6adcb2ba54682284654 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 18 Jul 2018 13:48:02 +0800 Subject: [PATCH] Introduce ChainState object - reduces the tangle of object dependencies - rationalizes responsibilities --- electrumx/server/chain_state.py | 112 +++++++++++++++++++ electrumx/server/controller.py | 55 +++------- electrumx/server/mempool.py | 36 +++---- electrumx/server/peers.py | 9 +- electrumx/server/session.py | 185 +++++++++++++------------------- 5 files changed, 223 insertions(+), 174 deletions(-) create mode 100644 electrumx/server/chain_state.py diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py new file mode 100644 index 0000000..59ed1ac --- /dev/null +++ b/electrumx/server/chain_state.py @@ -0,0 +1,112 @@ +# Copyright (c) 2016-2018, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + + +import pylru + +from electrumx.server.mempool import MemPool + + +class ChainState(object): + '''Used as an interface by servers to request information about + blocks, transaction history, UTXOs and the mempool. + ''' + + def __init__(self, env, tasks, shutdown_event): + self.env = env + self.tasks = tasks + self.shutdown_event = shutdown_event + self.daemon = env.coin.DAEMON(env) + self.bp = env.coin.BLOCK_PROCESSOR(env, tasks, self.daemon) + self.mempool = MemPool(env.coin, self, self.tasks, + self.bp.add_new_block_callback) + self.history_cache = pylru.lrucache(256) + # External interface: pass-throughs for mempool.py + self.cached_mempool_hashes = self.daemon.cached_mempool_hashes + self.mempool_refresh_event = self.daemon.mempool_refresh_event + self.getrawtransactions = self.daemon.getrawtransactions + self.utxo_lookup = self.bp.db_utxo_lookup + # External interface pass-throughs for session.py + self.force_chain_reorg = self.bp.force_chain_reorg + self.mempool_fee_histogram = self.mempool.get_fee_histogram + self.mempool_get_utxos = self.mempool.get_utxos + self.mempool_potential_spends = self.mempool.potential_spends + self.mempool_transactions = self.mempool.transactions + self.mempool_value = self.mempool.value + self.tx_branch_and_root = self.bp.merkle.branch_and_root + self.read_headers = self.bp.read_headers + + async def broadcast_transaction(self, raw_tx): + return await self.daemon.sendrawtransaction([raw_tx]) + + async def daemon_request(self, method, args): + return await getattr(self.daemon, method)(*args) + + def db_height(self): + return self.bp.db_height + + def get_info(self): + '''Chain state info for LocalRPC and logs.''' + return { + 'daemon': self.daemon.logged_url(), + 'daemon_height': self.daemon.cached_height(), + 'db_height': self.db_height(), + } + + async def get_history(self, hashX): + '''Get history asynchronously to reduce latency.''' + def job(): + # History DoS limit. Each element of history is about 99 + # bytes when encoded as JSON. This limits resource usage + # on bloated history requests, and uses a smaller divisor + # so large requests are logged before refusing them. + limit = self.env.max_send // 97 + return list(self.bp.get_history(hashX, limit=limit)) + + hc = self.history_cache + if hashX not in hc: + hc[hashX] = await self.tasks.run_in_thread(job) + return hc[hashX] + + async def get_utxos(self, hashX): + '''Get UTXOs asynchronously to reduce latency.''' + def job(): + return list(self.bp.get_utxos(hashX, limit=None)) + + return await self.tasks.run_in_thread(job) + + def header_branch_and_root(self, length, height): + return self.bp.header_mc.branch_and_root(length, height) + + def invalidate_history_cache(self, touched): + hc = self.history_cache + for hashX in set(hc).intersection(touched): + del hc[hashX] + + def processing_new_block(self): + '''Return True if we're processing a new block.''' + return self.daemon.cached_height() > self.db_height() + + def raw_header(self, height): + '''Return the binary header at the given height.''' + header, n = self.bp.read_headers(height, 1) + if n != 1: + raise IndexError(f'height {height:,d} out of range') + return header + + def set_daemon_url(self, daemon_url): + self.daemon.set_urls(self.env.coin.daemon_urls(daemon_url)) + return self.daemon.logged_url() + + def shutdown(self): + self.tasks.loop.call_soon(self.shutdown_event.set) + + async def wait_for_mempool(self): + self.tasks.create_task(self.bp.main_loop()) + await self.bp.caught_up_event.wait() + self.tasks.create_task(self.mempool.main_loop()) + await self.mempool.synchronized_event.wait() diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index b7accab..aa26b0c 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -10,16 +10,16 @@ import electrumx from electrumx.lib.server_base import ServerBase from electrumx.lib.tasks import Tasks from electrumx.lib.util import version_string -from electrumx.server.mempool import MemPool +from electrumx.server.chain_state import ChainState from electrumx.server.peers import PeerManager from electrumx.server.session import SessionManager class Controller(ServerBase): - '''Manages the client servers, a mempool, and a block processor. + '''Manages server initialisation and stutdown. - Servers are started immediately the block processor first catches - up with the daemon. + Servers are started once the mempool is synced after the block + processor first catches up with the daemon. ''' AIORPCX_MIN = (0, 5, 6) @@ -41,20 +41,20 @@ class Controller(ServerBase): env.max_send = max(350000, env.max_send) self.tasks = Tasks() - self.session_mgr = SessionManager(env, self.tasks, self) - self.daemon = env.coin.DAEMON(env) - self.bp = env.coin.BLOCK_PROCESSOR(env, self.tasks, self.daemon) - self.mempool = MemPool(self.bp, self.daemon, self.tasks, - self.session_mgr.notify_sessions) - self.peer_mgr = PeerManager(env, self.tasks, self.session_mgr, self.bp) + self.chain_state = ChainState(env, self.tasks, self.shutdown_event) + self.peer_mgr = PeerManager(env, self.tasks, self.chain_state) + self.session_mgr = SessionManager(env, self.tasks, self.chain_state, + self.peer_mgr) async def start_servers(self): - '''Start the RPC server and schedule the external servers to be - started once the block processor has caught up. + '''Start the RPC server and wait for the mempool to synchronize. Then + start the peer manager and serving external clients. ''' await self.session_mgr.start_rpc_server() - self.tasks.create_task(self.bp.main_loop()) - self.tasks.create_task(self.wait_for_bp_catchup()) + await self.chain_state.wait_for_mempool() + self.tasks.create_task(self.peer_mgr.main_loop()) + self.tasks.create_task(self.session_mgr.start_serving()) + self.tasks.create_task(self.session_mgr.housekeeping()) async def shutdown(self): '''Perform the shutdown sequence.''' @@ -63,29 +63,4 @@ class Controller(ServerBase): await self.session_mgr.shutdown() await self.tasks.wait() # Finally shut down the block processor and executor (FIXME) - self.bp.shutdown(self.tasks.executor) - - async def mempool_transactions(self, hashX): - '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool - entries for the hashX. - - unconfirmed is True if any txin is unconfirmed. - ''' - return await self.mempool.transactions(hashX) - - def mempool_value(self, hashX): - '''Return the unconfirmed amount in the mempool for hashX. - - Can be positive or negative. - ''' - return self.mempool.value(hashX) - - async def wait_for_bp_catchup(self): - '''Wait for the block processor to catch up, and for the mempool to - synchronize, then kick off server background processes.''' - await self.bp.caught_up_event.wait() - self.tasks.create_task(self.mempool.main_loop()) - await self.mempool.synchronized_event.wait() - self.tasks.create_task(self.peer_mgr.main_loop()) - self.tasks.create_task(self.session_mgr.start_serving()) - self.tasks.create_task(self.session_mgr.housekeeping()) + self.chain_state.bp.shutdown(self.tasks.executor) diff --git a/electrumx/server/mempool.py b/electrumx/server/mempool.py index b08de09..d75fb01 100644 --- a/electrumx/server/mempool.py +++ b/electrumx/server/mempool.py @@ -15,7 +15,7 @@ from collections import defaultdict from electrumx.lib.hash import hash_to_hex_str, hex_str_to_hash from electrumx.lib.util import class_logger from electrumx.server.daemon import DaemonError -from electrumx.server.db import UTXO +from electrumx.server.db import UTXO, DB class MemPool(object): @@ -32,13 +32,11 @@ class MemPool(object): A pair is a (hashX, value) tuple. tx hashes are hex strings. ''' - def __init__(self, db, daemon, tasks, notify_sessions): + def __init__(self, coin, chain_state, tasks, add_new_block_callback): self.logger = class_logger(__name__, self.__class__.__name__) - self.db = db - self.daemon = daemon + self.coin = coin + self.chain_state = chain_state self.tasks = tasks - self.notify_sessions = notify_sessions - self.coin = db.coin self.touched = set() self.stop = False self.txs = {} @@ -47,7 +45,7 @@ class MemPool(object): self.fee_histogram = defaultdict(int) self.compact_fee_histogram = [] self.histogram_time = 0 - db.add_new_block_callback(self.on_new_block) + add_new_block_callback(self.on_new_block) def _resync_daemon_hashes(self, unprocessed, unfetched): '''Re-sync self.txs with the list of hashes in the daemon's mempool. @@ -60,7 +58,7 @@ class MemPool(object): touched = self.touched fee_hist = self.fee_histogram - hashes = self.daemon.cached_mempool_hashes() + hashes = self.chain_state.cached_mempool_hashes() gone = set(txs).difference(hashes) for hex_hash in gone: unfetched.discard(hex_hash) @@ -99,13 +97,13 @@ class MemPool(object): self.logger.info('beginning processing of daemon mempool. ' 'This can take some time...') - await self.daemon.mempool_refresh_event.wait() + await self.chain_state.mempool_refresh_event.wait() next_log = 0 loops = -1 # Zero during initial catchup while True: # Avoid double notifications if processing a block - if self.touched and not self.processing_new_block(): + if self.touched and not self.chain_state.processing_new_block(): self.notify_sessions(self.touched) self.touched.clear() @@ -127,10 +125,10 @@ class MemPool(object): try: if not todo: - await self.daemon.mempool_refresh_event.wait() + await self.chain_state.mempool_refresh_event.wait() self._resync_daemon_hashes(unprocessed, unfetched) - self.daemon.mempool_refresh_event.clear() + self.chain_state.mempool_refresh_event.clear() if unfetched: count = min(len(unfetched), fetch_size) @@ -196,13 +194,9 @@ class MemPool(object): self.touched.clear() self.notify_sessions(touched) - def processing_new_block(self): - '''Return True if we're processing a new block.''' - return self.daemon.cached_height() > self.db.db_height - async def fetch_raw_txs(self, hex_hashes): '''Fetch a list of mempool transactions.''' - raw_txs = await self.daemon.getrawtransactions(hex_hashes) + raw_txs = await self.chain_state.getrawtransactions(hex_hashes) # Skip hashes the daemon has dropped. Either they were # evicted or they got in a block. @@ -218,7 +212,6 @@ class MemPool(object): ''' script_hashX = self.coin.hashX_from_script deserializer = self.coin.DESERIALIZER - db_utxo_lookup = self.db.db_utxo_lookup txs = self.txs # Deserialize each tx and put it in a pending list @@ -240,6 +233,7 @@ class MemPool(object): # Now process what we can result = {} deferred = [] + utxo_lookup = self.chain_state.utxo_lookup for item in pending: if self.stop: @@ -264,8 +258,8 @@ class MemPool(object): txin_pairs.append(tx_info[1][prev_idx]) elif not mempool_missing: prev_hash = hex_str_to_hash(prev_hex_hash) - txin_pairs.append(db_utxo_lookup(prev_hash, prev_idx)) - except (self.db.MissingUTXOError, self.db.DBError): + txin_pairs.append(utxo_lookup(prev_hash, prev_idx)) + except (DB.MissingUTXOError, DB.DBError): # DBError can happen when flushing a newly processed # block. MissingUTXOError typically happens just # after the daemon has accepted a new block and the @@ -293,7 +287,7 @@ class MemPool(object): return [] hex_hashes = self.hashXs[hashX] - raw_txs = await self.daemon.getrawtransactions(hex_hashes) + raw_txs = await self.chain_state.getrawtransactions(hex_hashes) return zip(hex_hashes, raw_txs) async def transactions(self, hashX): diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index d6bb6a0..d4b9235 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -141,7 +141,7 @@ class PeerSession(ClientSession): return result = request.result() - our_height = self.peer_mgr.bp.db_height + our_height = self.peer_mgr.chain_state.db_height() if self.ptuple < (1, 3): their_height = result.get('block_height') else: @@ -155,7 +155,7 @@ class PeerSession(ClientSession): return # Check prior header too in case of hard fork. check_height = min(our_height, their_height) - raw_header = self.peer_mgr.session_mgr.raw_header(check_height) + raw_header = self.peer_mgr.chain_state.raw_header(check_height) if self.ptuple >= (1, 4): self.send_request('blockchain.block.header', [check_height], partial(self.on_header, raw_header.hex()), @@ -240,14 +240,13 @@ class PeerManager(object): Attempts to maintain a connection with up to 8 peers. Issues a 'peers.subscribe' RPC to them and tells them our data. ''' - def __init__(self, env, tasks, session_mgr, bp): + def __init__(self, env, tasks, chain_state): self.logger = class_logger(__name__, self.__class__.__name__) # Initialise the Peer class Peer.DEFAULT_PORTS = env.coin.PEER_DEFAULT_PORTS self.env = env self.tasks = tasks - self.session_mgr = session_mgr - self.bp = bp + self.chain_state = chain_state self.loop = tasks.loop # Our clearnet and Tor Peers, if any diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 6ac8293..cb34d5b 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -18,7 +18,6 @@ import time from collections import defaultdict from functools import partial -import pylru from aiorpcx import ServerSession, JSONRPCAutoDetect, RPCError import electrumx @@ -98,10 +97,11 @@ class SessionManager(object): CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) - def __init__(self, env, tasks, controller): + def __init__(self, env, tasks, chain_state, peer_mgr): self.env = env self.tasks = tasks - self.controller = controller + self.chain_state = chain_state + self.peer_mgr = peer_mgr self.logger = util.class_logger(__name__, self.__class__.__name__) self.servers = {} self.sessions = set() @@ -113,7 +113,6 @@ class SessionManager(object): self.state = self.CATCHING_UP self.txs_sent = 0 self.start_time = time.time() - self.history_cache = pylru.lrucache(256) # Cache some idea of room to avoid recounting on each subscription self.subs_room = 0 # Masternode stuff only for such coins @@ -122,6 +121,8 @@ class SessionManager(object): self.mn_cache = [] # Event triggered when electrumx is listening for incoming requests. self.server_listening = asyncio.Event() + # FIXME + chain_state.mempool.notify_sessions = self.notify_sessions # Set up the RPC request handlers cmds = ('add_peer daemon_url disconnect getinfo groups log peers ' 'reorg sessions stop'.split()) @@ -133,7 +134,8 @@ class SessionManager(object): protocol_class = LocalRPC else: protocol_class = self.env.coin.SESSIONCLS - protocol_factory = partial(protocol_class, self, self.controller, kind) + protocol_factory = partial(protocol_class, self, self.chain_state, + self.peer_mgr, kind) server = loop.create_server(protocol_factory, *args, **kw_args) host, port = args[:2] @@ -237,30 +239,26 @@ class SessionManager(object): for session in group_map[group]: session.group = new_group - def _getinfo(self): - '''A one-line summary of server state.''' + def _get_info(self): + '''A summary of server state.''' group_map = self._group_map() - daemon = self.controller.daemon - bp = self.controller.bp - peer_mgr = self.controller.peer_mgr - return { + result = self.chain_state.get_info() + result.update({ 'version': electrumx.version, - 'daemon': daemon.logged_url(), - 'daemon_height': daemon.cached_height(), - 'db_height': bp.db_height, 'closing': len([s for s in self.sessions if s.is_closing()]), 'errors': sum(s.rpc.errors for s in self.sessions), 'groups': len(group_map), 'logged': len([s for s in self.sessions if s.log_me]), 'paused': sum(s.paused for s in self.sessions), 'pid': os.getpid(), - 'peers': peer_mgr.info(), + 'peers': self.peer_mgr.info(), 'requests': sum(s.count_pending_items() for s in self.sessions), 'sessions': self.session_count(), 'subs': self._sub_count(), 'txs_sent': self.txs_sent, 'uptime': util.formatted_time(time.time() - self.start_time), - } + }) + return result def _session_data(self, for_log): '''Returned to the RPC 'sessions' call.''' @@ -305,7 +303,7 @@ class SessionManager(object): real_name: a real name, as would appear on IRC ''' peer = Peer.from_real_name(real_name, 'RPC') - self.controller.peer_mgr.add_peers([peer]) + self.peer_mgr.add_peers([peer]) return "peer '{}' added".format(real_name) def rpc_disconnect(self, session_ids): @@ -330,22 +328,20 @@ class SessionManager(object): def rpc_daemon_url(self, daemon_url=None): '''Replace the daemon URL.''' daemon_url = daemon_url or self.env.daemon_url - daemon = self.controller.daemon try: - daemon.set_urls(self.env.coin.daemon_urls(daemon_url)) + daemon_url = self.chain_state.set_daemon_url(daemon_url) except Exception as e: raise RPCError(BAD_REQUEST, f'an error occured: {e}') - return 'now using daemon at {}'.format(daemon.logged_url()) + return f'now using daemon at {daemon_url}' def rpc_stop(self): '''Shut down the server cleanly.''' - loop = asyncio.get_event_loop() - loop.call_soon(self.controller.shutdown_event.set) + self.chain_state.shutdown() return 'stopping' def rpc_getinfo(self): '''Return summary information about the server process.''' - return self._getinfo() + return self._get_info() def rpc_groups(self): '''Return statistics about the session groups.''' @@ -353,7 +349,7 @@ class SessionManager(object): def rpc_peers(self): '''Return a list of data about server peers.''' - return self.controller.peer_mgr.rpc_data() + return self.peer_mgr.rpc_data() def rpc_sessions(self): '''Return statistics about connected sessions.''' @@ -365,7 +361,7 @@ class SessionManager(object): count: number of blocks to reorg (default 3) ''' count = non_negative_integer(count) - if not self.controller.bp.force_chain_reorg(count): + if not self.chain_state.force_chain_reorg(count): raise RPCError(BAD_REQUEST, 'still catching up with daemon') return 'scheduled a reorg of {:,d} blocks'.format(count) @@ -409,14 +405,10 @@ class SessionManager(object): def notify_sessions(self, touched): '''Notify sessions about height changes and touched addresses.''' - height = self.controller.bp.db_height - # Invalidate caches - hc = self.history_cache - for hashX in set(hc).intersection(touched): - del hc[hashX] - + self.chain_state.invalidate_history_cache(touched) # Height notifications are synchronous. Those sessions with # touched addresses are scheduled for asynchronous completion + height = self.chain_state.db_height() for session in self.sessions: if isinstance(session, LocalRPC): continue @@ -424,37 +416,6 @@ class SessionManager(object): if session_touched is not None: self.tasks.create_task(session.notify_async(session_touched)) - def raw_header(self, height): - '''Return the binary header at the given height.''' - header, n = self.controller.bp.read_headers(height, 1) - if n != 1: - raise RPCError(BAD_REQUEST, f'height {height:,d} out of range') - return header - - async def get_history(self, hashX): - '''Get history asynchronously to reduce latency.''' - if hashX in self.history_cache: - return self.history_cache[hashX] - - def job(): - # History DoS limit. Each element of history is about 99 - # bytes when encoded as JSON. This limits resource usage - # on bloated history requests, and uses a smaller divisor - # so large requests are logged before refusing them. - limit = self.env.max_send // 97 - return list(self.controller.bp.get_history(hashX, limit=limit)) - - history = await self.tasks.run_in_thread(job) - self.history_cache[hashX] = history - return history - - async def get_utxos(self, hashX): - '''Get UTXOs asynchronously to reduce latency.''' - def job(): - return list(self.controller.bp.get_utxos(hashX, limit=None)) - - return await self.tasks.run_in_thread(job) - async def housekeeping(self): '''Regular housekeeping checks.''' n = 0 @@ -476,7 +437,7 @@ class SessionManager(object): data = self._session_data(for_log=True) for line in text.sessions_lines(data): self.logger.info(line) - self.logger.info(json.dumps(self._getinfo())) + self.logger.info(json.dumps(self._get_info())) self.next_log_sessions = time.time() + self.env.log_sessions def add_session(self, session): @@ -516,16 +477,15 @@ class SessionBase(ServerSession): MAX_CHUNK_SIZE = 2016 session_counter = itertools.count() - def __init__(self, session_mgr, controller, kind): + def __init__(self, session_mgr, chain_state, peer_mgr, kind): super().__init__(rpc_protocol=JSONRPCAutoDetect) self.logger = util.class_logger(__name__, self.__class__.__name__) self.session_mgr = session_mgr - self.controller = controller + self.chain_state = chain_state + self.peer_mgr = peer_mgr self.kind = kind # 'RPC', 'TCP' etc. - self.bp = controller.bp - self.env = controller.env + self.env = chain_state.env self.coin = self.env.coin - self.daemon = self.bp.daemon self.client = 'unknown' self.anon_logs = self.env.anon_logs self.txs_sent = 0 @@ -615,6 +575,7 @@ class ElectrumX(SessionBase): self.sv_seen = False self.mempool_statuses = {} self.set_protocol_handlers(self.PROTOCOL_MIN) + self.db_height = self.chain_state.db_height @classmethod def protocol_min_max_strings(cls): @@ -646,7 +607,7 @@ class ElectrumX(SessionBase): async def daemon_request(self, method, *args): '''Catch a DaemonError and convert it to an RPCError.''' try: - return await getattr(self.controller.daemon, method)(*args) + return await self.chain_state.daemon_request(method, args) except DaemonError as e: raise RPCError(DAEMON_ERROR, f'daemon error: {e}') @@ -706,25 +667,28 @@ class ElectrumX(SessionBase): return None - def height(self): - '''Return the current flushed database height.''' - return self.bp.db_height - def assert_boolean(self, value): '''Return param value it is boolean otherwise raise an RPCError.''' if value in (False, True): return value raise RPCError(BAD_REQUEST, f'{value} should be a boolean value') + def raw_header(self, height): + '''Return the binary header at the given height.''' + try: + return self.chain_state.raw_header(height) + except IndexError: + raise RPCError(BAD_REQUEST, f'height {height:,d} out of range') + def electrum_header(self, height): '''Return the deserialized header at the given height.''' - raw_header = self.session_mgr.raw_header(height) + raw_header = self.raw_header(height) return self.coin.electrum_header(raw_header, height) def subscribe_headers_result(self, height): '''The result of a header subscription for the given height.''' if self.subscribe_headers_raw: - raw_header = self.session_mgr.raw_header(height) + raw_header = self.raw_header(height) return {'hex': raw_header.hex(), 'height': height} return self.electrum_header(height) @@ -732,8 +696,8 @@ class ElectrumX(SessionBase): '''Subscribe to get headers of new blocks.''' self.subscribe_headers = True self.subscribe_headers_raw = self.assert_boolean(raw) - self.notified_height = self.height() - return self.subscribe_headers_result(self.height()) + self.notified_height = self.db_height() + return self.subscribe_headers_result(self.notified_height) def headers_subscribe(self): '''Subscribe to get raw headers of new blocks.''' @@ -749,12 +713,12 @@ class ElectrumX(SessionBase): async def add_peer(self, features): '''Add a peer (but only if the peer resolves to the source).''' - peer_mgr = self.controller.peer_mgr + peer_mgr = self.peer_mgr return await peer_mgr.on_add_peer(features, self.peer_address()) def peers_subscribe(self): '''Return the server peers as a list of (ip, host, details) tuples.''' - return self.controller.peer_mgr.on_peers_subscribe(self.is_tor()) + return self.peer_mgr.on_peers_subscribe(self.is_tor()) async def address_status(self, hashX): '''Returns an address status. @@ -763,8 +727,8 @@ class ElectrumX(SessionBase): ''' # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 - history = await self.session_mgr.get_history(hashX) - mempool = await self.controller.mempool_transactions(hashX) + history = await self.chain_state.get_history(hashX) + mempool = await self.chain_state.mempool_transactions(hashX) status = ''.join('{}:{:d}:'.format(hash_to_hex_str(tx_hash), height) for tx_hash, height in history) @@ -785,10 +749,10 @@ class ElectrumX(SessionBase): async def hashX_listunspent(self, hashX): '''Return the list of UTXOs of a script hash, including mempool effects.''' - utxos = await self.session_mgr.get_utxos(hashX) + utxos = await self.chain_state.get_utxos(hashX) utxos = sorted(utxos) - utxos.extend(self.controller.mempool.get_utxos(hashX)) - spends = await self.controller.mempool.potential_spends(hashX) + utxos.extend(self.chain_state.mempool_get_utxos(hashX)) + spends = await self.chain_state.mempool_potential_spends(hashX) return [{'tx_hash': hash_to_hex_str(utxo.tx_hash), 'tx_pos': utxo.tx_pos, @@ -802,7 +766,7 @@ class ElectrumX(SessionBase): raise RPCError(BAD_REQUEST, 'your address subscription limit ' f'{self.max_subs:,d} reached') - # Now let the controller check its limit + # Now let the session manager check its limit self.session_mgr.new_subscription() self.hashX_subs[hashX] = alias return await self.address_status(hashX) @@ -842,9 +806,9 @@ class ElectrumX(SessionBase): return await self.hashX_subscribe(hashX, address) async def get_balance(self, hashX): - utxos = await self.session_mgr.get_utxos(hashX) + utxos = await self.chain_state.get_utxos(hashX) confirmed = sum(utxo.value for utxo in utxos) - unconfirmed = self.controller.mempool_value(hashX) + unconfirmed = self.chain_state.mempool_value(hashX) return {'confirmed': confirmed, 'unconfirmed': unconfirmed} async def scripthash_get_balance(self, scripthash): @@ -855,13 +819,13 @@ class ElectrumX(SessionBase): async def unconfirmed_history(self, hashX): # Note unconfirmed history is unordered in electrum-server # Height is -1 if unconfirmed txins, otherwise 0 - mempool = await self.controller.mempool_transactions(hashX) + mempool = await self.chain_state.mempool_transactions(hashX) return [{'tx_hash': tx_hash, 'height': -unconfirmed, 'fee': fee} for tx_hash, fee, unconfirmed in mempool] async def confirmed_and_unconfirmed_history(self, hashX): # Note history is ordered but unconfirmed is unordered in e-s - history = await self.session_mgr.get_history(hashX) + history = await self.chain_state.get_history(hashX) conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height} for tx_hash, height in history] return conf + await self.unconfirmed_history(hashX) @@ -889,13 +853,14 @@ class ElectrumX(SessionBase): return await self.hashX_subscribe(hashX, scripthash) def _merkle_proof(self, cp_height, height): - max_height = self.height() + max_height = self.db_height() if not height <= cp_height <= max_height: raise RPCError(BAD_REQUEST, f'require header height {height:,d} <= ' f'cp_height {cp_height:,d} <= ' f'chain height {max_height:,d}') - branch, root = self.bp.header_mc.branch_and_root(cp_height + 1, height) + branch, root = self.chain_state.header_branch_and_root( + cp_height + 1, height) return { 'branch': [hash_to_hex_str(elt) for elt in branch], 'root': hash_to_hex_str(root), @@ -906,7 +871,7 @@ class ElectrumX(SessionBase): dictionary with a merkle proof.''' height = non_negative_integer(height) cp_height = non_negative_integer(cp_height) - raw_header_hex = self.session_mgr.raw_header(height).hex() + raw_header_hex = self.raw_header(height).hex() if cp_height == 0: return raw_header_hex result = {'header': raw_header_hex} @@ -932,7 +897,7 @@ class ElectrumX(SessionBase): max_size = self.MAX_CHUNK_SIZE count = min(count, max_size) - headers, count = self.bp.read_headers(start_height, count) + headers, count = self.chain_state.read_headers(start_height, count) result = {'hex': headers.hex(), 'count': count, 'max': max_size} if count and cp_height: last_height = start_height + count - 1 @@ -947,9 +912,9 @@ class ElectrumX(SessionBase): index: the chunk index''' index = non_negative_integer(index) - chunk_size = self.coin.CHUNK_SIZE - start_height = index * chunk_size - headers, count = self.bp.read_headers(start_height, chunk_size) + size = self.coin.CHUNK_SIZE + start_height = index * size + headers, count = self.chain_state.read_headers(start_height, size) return headers.hex() def block_get_header(self, height): @@ -962,7 +927,7 @@ class ElectrumX(SessionBase): def is_tor(self): '''Try to detect if the connection is to a tor hidden service we are running.''' - peername = self.controller.peer_mgr.proxy_peername() + peername = self.peer_mgr.proxy_peername() if not peername: return False peer_address = self.peer_address() @@ -1010,7 +975,7 @@ class ElectrumX(SessionBase): def mempool_get_fee_histogram(self): '''Memory pool fee histogram.''' - return self.controller.mempool.get_fee_histogram() + return self.chain_state.mempool_fee_histogram() async def relayfee(self): '''The minimum fee a low-priority tx must pay in order to be accepted @@ -1073,7 +1038,7 @@ class ElectrumX(SessionBase): raw_tx: the raw transaction as a hexadecimal string''' # This returns errors as JSON RPC errors, as is natural try: - tx_hash = await self.daemon.sendrawtransaction([raw_tx]) + tx_hash = await self.chain_state.broadcast_transaction(raw_tx) self.txs_sent += 1 self.session_mgr.txs_sent += 1 self.logger.info('sent tx: {}'.format(tx_hash)) @@ -1117,7 +1082,7 @@ class ElectrumX(SessionBase): tx_pos: index of transaction in tx_hashes to create branch for ''' hashes = [hex_str_to_hash(hash) for hash in tx_hashes] - branch, root = self.bp.merkle.branch_and_root(hashes, tx_pos) + branch, root = self.chain_state.tx_branch_and_root(hashes, tx_pos) branch = [hash_to_hex_str(hash) for hash in branch] return branch @@ -1254,10 +1219,11 @@ class DashElectrumX(ElectrumX): }) async def notify_masternodes_async(self): - for masternode in self.mns: - status = await self.daemon.masternode_list(['status', masternode]) + for mn in self.mns: + status = await self.daemon_request('masternode_list', + ['status', mn]) self.send_notification('masternode.subscribe', - [masternode, status.get(masternode)]) + [mn, status.get(mn)]) def notify(self, height, touched): '''Notify the client about changes in masternode list.''' @@ -1274,7 +1240,8 @@ class DashElectrumX(ElectrumX): signmnb: signed masternode broadcast message.''' try: - return await self.daemon.masternode_broadcast(['relay', signmnb]) + return await self.daemon_request('masternode_broadcast', + ['relay', signmnb]) except DaemonError as e: error, = e.args message = error['message'] @@ -1287,7 +1254,8 @@ class DashElectrumX(ElectrumX): collateral: masternode collateral. ''' - result = await self.daemon.masternode_list(['status', collateral]) + result = await self.daemon_request('masternode_list', + ['status', collateral]) if result is not None: self.mns.add(collateral) return result.get(collateral) @@ -1350,8 +1318,9 @@ class DashElectrumX(ElectrumX): # with the masternode information including the payment # position is returned. cache = self.session_mgr.mn_cache - if not cache or self.session_mgr.mn_cache_height != self.height(): - full_mn_list = await self.daemon.masternode_list(['full']) + if not cache or self.session_mgr.mn_cache_height != self.db_height(): + full_mn_list = await self.daemon_request('masternode_list', + ['full']) mn_payment_queue = get_masternode_payment_queue(full_mn_list) mn_payment_count = len(mn_payment_queue) mn_list = [] @@ -1377,7 +1346,7 @@ class DashElectrumX(ElectrumX): mn_list.append(mn_info) cache.clear() cache.extend(mn_list) - self.session_mgr.mn_cache_height = self.height() + self.session_mgr.mn_cache_height = self.db_height() # If payees is an empty list the whole masternode list is returned if payees: