diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py deleted file mode 100644 index 135d42a..0000000 --- a/electrumx/server/chain_state.py +++ /dev/null @@ -1,102 +0,0 @@ -# Copyright (c) 2016-2018, Neil Booth -# -# All rights reserved. -# -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. - - -from electrumx.lib.hash import hash_to_hex_str - - -class ChainState(object): - '''Used as an interface by servers to request information about - blocks, transaction history, UTXOs and the mempool. - ''' - - def __init__(self, env, db, daemon, bp): - self._env = env - self._db = db - self._daemon = daemon - - # External interface pass-throughs for session.py - self.force_chain_reorg = bp.force_chain_reorg - self.tx_branch_and_root = db.merkle.branch_and_root - self.read_headers = db.read_headers - self.all_utxos = db.all_utxos - self.limited_history = db.limited_history - self.header_branch_and_root = db.header_branch_and_root - - async def broadcast_transaction(self, raw_tx): - return await self._daemon.sendrawtransaction([raw_tx]) - - async def daemon_request(self, method, args=()): - return await getattr(self._daemon, method)(*args) - - def db_height(self): - return self._db.db_height - - def get_info(self): - '''Chain state info for LocalRPC and logs.''' - return { - 'daemon': self._daemon.logged_url(), - 'daemon_height': self._daemon.cached_height(), - 'db_height': self.db_height(), - } - - async def raw_header(self, height): - '''Return the binary header at the given height.''' - header, n = await self.read_headers(height, 1) - if n != 1: - raise IndexError(f'height {height:,d} out of range') - return header - - def set_daemon_url(self, daemon_url): - self._daemon.set_urls(self._env.coin.daemon_urls(daemon_url)) - return self._daemon.logged_url() - - async def query(self, args, limit): - coin = self._env.coin - db = self._db - lines = [] - - def arg_to_hashX(arg): - try: - script = bytes.fromhex(arg) - lines.append(f'Script: {arg}') - return coin.hashX_from_script(script) - except ValueError: - pass - - hashX = coin.address_to_hashX(arg) - lines.append(f'Address: {arg}') - return hashX - - for arg in args: - hashX = arg_to_hashX(arg) - if not hashX: - continue - n = None - history = await db.limited_history(hashX, limit=limit) - for n, (tx_hash, height) in enumerate(history): - lines.append(f'History #{n:,d}: height {height:,d} ' - f'tx_hash {hash_to_hex_str(tx_hash)}') - if n is None: - lines.append('No history found') - n = None - utxos = await db.all_utxos(hashX) - for n, utxo in enumerate(utxos, start=1): - lines.append(f'UTXO #{n:,d}: tx_hash ' - f'{hash_to_hex_str(utxo.tx_hash)} ' - f'tx_pos {utxo.tx_pos:,d} height ' - f'{utxo.height:,d} value {utxo.value:,d}') - if n == limit: - break - if n is None: - lines.append('No UTXOs found') - - balance = sum(utxo.value for utxo in utxos) - lines.append(f'Balance: {coin.decimal_value(balance):,f} ' - f'{coin.SHORTNAME}') - - return lines diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 9061f93..5b156c2 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -12,7 +12,6 @@ from aiorpcx import _version as aiorpcx_version, TaskGroup import electrumx from electrumx.lib.server_base import ServerBase from electrumx.lib.util import version_string -from electrumx.server.chain_state import ChainState from electrumx.server.db import DB from electrumx.server.mempool import MemPool, MemPoolAPI from electrumx.server.session import SessionManager @@ -97,7 +96,6 @@ class Controller(ServerBase): db = DB(env) BlockProcessor = env.coin.BLOCK_PROCESSOR bp = BlockProcessor(env, db, daemon, notifications) - chain_state = ChainState(env, db, daemon, bp) # Set ourselves up to implement the MemPoolAPI self.height = daemon.height @@ -109,7 +107,7 @@ class Controller(ServerBase): MemPoolAPI.register(Controller) mempool = MemPool(env.coin, self) - session_mgr = SessionManager(env, chain_state, mempool, + session_mgr = SessionManager(env, db, bp, daemon, mempool, notifications, shutdown_event) caught_up_event = Event() diff --git a/electrumx/server/db.py b/electrumx/server/db.py index 89c8d5e..e134cec 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -370,6 +370,13 @@ class DB(object): # Truncate header_mc: header count is 1 more than the height. self.header_mc.truncate(height + 1) + async def raw_header(self, height): + '''Return the binary header at the given height.''' + header, n = await self.read_headers(height, 1) + if n != 1: + raise IndexError(f'height {height:,d} out of range') + return header + async def read_headers(self, start_height, count): '''Requires start_height >= 0, count >= 0. Reads as many headers as are available starting at start_height up to count. This diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index 31e9f74..ad36dbf 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -55,12 +55,12 @@ class PeerManager(object): Attempts to maintain a connection with up to 8 peers. Issues a 'peers.subscribe' RPC to them and tells them our data. ''' - def __init__(self, env, chain_state): + def __init__(self, env, db): self.logger = class_logger(__name__, self.__class__.__name__) # Initialise the Peer class Peer.DEFAULT_PORTS = env.coin.PEER_DEFAULT_PORTS self.env = env - self.chain_state = chain_state + self.db = db # Our clearnet and Tor Peers, if any sclass = env.coin.SESSIONCLS @@ -300,7 +300,7 @@ class PeerManager(object): result = await session.send_request(message) assert_good(message, result, dict) - our_height = self.chain_state.db_height() + our_height = self.db.db_height if ptuple < (1, 3): their_height = result.get('block_height') else: @@ -313,7 +313,7 @@ class PeerManager(object): # Check prior header too in case of hard fork. check_height = min(our_height, their_height) - raw_header = await self.chain_state.raw_header(check_height) + raw_header = await self.db.raw_header(check_height) if ptuple >= (1, 4): ours = raw_header.hex() message = 'blockchain.block.header' diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 5d6b9f3..6d6b3e1 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -109,13 +109,15 @@ class SessionManager(object): CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) - def __init__(self, env, chain_state, mempool, notifications, + def __init__(self, env, db, bp, daemon, mempool, notifications, shutdown_event): env.max_send = max(350000, env.max_send) self.env = env - self.chain_state = chain_state + self.db = db + self.bp = bp + self.daemon = daemon self.mempool = mempool - self.peer_mgr = PeerManager(env, chain_state) + self.peer_mgr = PeerManager(env, db) self.shutdown_event = shutdown_event self.logger = util.class_logger(__name__, self.__class__.__name__) self.servers = {} @@ -152,7 +154,7 @@ class SessionManager(object): protocol_class = LocalRPC else: protocol_class = self.env.coin.SESSIONCLS - protocol_factory = partial(protocol_class, self, self.chain_state, + protocol_factory = partial(protocol_class, self, self.db, self.mempool, self.peer_mgr, kind) server = loop.create_server(protocol_factory, *args, **kw_args) @@ -276,10 +278,11 @@ class SessionManager(object): def _get_info(self): '''A summary of server state.''' group_map = self._group_map() - result = self.chain_state.get_info() - result.update({ - 'version': electrumx.version, + return { 'closing': len([s for s in self.sessions if s.is_closing()]), + 'daemon': self.daemon.logged_url(), + 'daemon_height': self.daemon.cached_height(), + 'db_height': self.db.db_height, 'errors': sum(s.errors for s in self.sessions), 'groups': len(group_map), 'logged': len([s for s in self.sessions if s.log_me]), @@ -291,8 +294,8 @@ class SessionManager(object): 'subs': self._sub_count(), 'txs_sent': self.txs_sent, 'uptime': util.formatted_time(time.time() - self.start_time), - }) - return result + 'version': electrumx.version, + } def _session_data(self, for_log): '''Returned to the RPC 'sessions' call.''' @@ -367,10 +370,10 @@ class SessionManager(object): '''Replace the daemon URL.''' daemon_url = daemon_url or self.env.daemon_url try: - daemon_url = self.chain_state.set_daemon_url(daemon_url) + self.daemon.set_urls(self.env.coin.daemon_urls(daemon_url)) except Exception as e: raise RPCError(BAD_REQUEST, f'an error occured: {e!r}') - return f'now using daemon at {daemon_url}' + return f'now using daemon at {self.daemon.logged_url()}' async def rpc_stop(self): '''Shut down the server cleanly.''' @@ -391,10 +394,54 @@ class SessionManager(object): async def rpc_query(self, items, limit): '''Return a list of data about server peers.''' - try: - return await self.chain_state.query(items, limit) - except Base58Error as e: - raise RPCError(BAD_REQUEST, e.args[0]) from None + coin = self.env.coin + db = self.db + lines = [] + + def arg_to_hashX(arg): + try: + script = bytes.fromhex(arg) + lines.append(f'Script: {arg}') + return coin.hashX_from_script(script) + except ValueError: + pass + + try: + hashX = coin.address_to_hashX(arg) + except Base58Error as e: + lines.append(e.args[0]) + return None + lines.append(f'Address: {arg}') + return hashX + + for arg in args: + hashX = arg_to_hashX(arg) + if not hashX: + continue + n = None + history = await db.limited_history(hashX, limit=limit) + for n, (tx_hash, height) in enumerate(history): + lines.append(f'History #{n:,d}: height {height:,d} ' + f'tx_hash {hash_to_hex_str(tx_hash)}') + if n is None: + lines.append('No history found') + n = None + utxos = await db.all_utxos(hashX) + for n, utxo in enumerate(utxos, start=1): + lines.append(f'UTXO #{n:,d}: tx_hash ' + f'{hash_to_hex_str(utxo.tx_hash)} ' + f'tx_pos {utxo.tx_pos:,d} height ' + f'{utxo.height:,d} value {utxo.value:,d}') + if n == limit: + break + if n is None: + lines.append('No UTXOs found') + + balance = sum(utxo.value for utxo in utxos) + lines.append(f'Balance: {coin.decimal_value(balance):,f} ' + f'{coin.SHORTNAME}') + + return lines async def rpc_sessions(self): '''Return statistics about connected sessions.''' @@ -406,7 +453,7 @@ class SessionManager(object): count: number of blocks to reorg ''' count = non_negative_integer(count) - if not self.chain_state.force_chain_reorg(count): + if not self.bp.force_chain_reorg(count): raise RPCError(BAD_REQUEST, 'still catching up with daemon') return f'scheduled a reorg of {count:,d} blocks' @@ -454,6 +501,18 @@ class SessionManager(object): '''The number of connections that we've sent something to.''' return len(self.sessions) + async def daemon_request(self, method, *args): + '''Catch a DaemonError and convert it to an RPCError.''' + try: + return await getattr(self.daemon, method)(*args) + except DaemonError as e: + raise RPCError(DAEMON_ERROR, f'daemon error: {e!r}') from None + + async def broadcast_transaction(self, raw_tx): + hex_hash = await self.daemon.sendrawtransaction([raw_tx]) + self.txs_sent += 1 + return hex_hash + async def limited_history(self, hashX): '''A caching layer.''' hc = self._history_cache @@ -463,8 +522,7 @@ class SessionManager(object): # on bloated history requests, and uses a smaller divisor # so large requests are logged before refusing them. limit = self.env.max_send // 97 - hc[hashX] = await self.chain_state.limited_history(hashX, - limit=limit) + hc[hashX] = await self.db.limited_history(hashX, limit=limit) return hc[hashX] async def _notify_sessions(self, height, touched): @@ -518,12 +576,12 @@ class SessionBase(ServerSession): MAX_CHUNK_SIZE = 2016 session_counter = itertools.count() - def __init__(self, session_mgr, chain_state, mempool, peer_mgr, kind): + def __init__(self, session_mgr, db, mempool, peer_mgr, kind): connection = JSONRPCConnection(JSONRPCAutoDetect) super().__init__(connection=connection) self.logger = util.class_logger(__name__, self.__class__.__name__) self.session_mgr = session_mgr - self.chain_state = chain_state + self.db = db self.mempool = mempool self.peer_mgr = peer_mgr self.kind = kind # 'RPC', 'TCP' etc. @@ -534,6 +592,7 @@ class SessionBase(ServerSession): self.txs_sent = 0 self.log_me = False self.bw_limit = self.env.bandwidth_limit + self.daemon_request = self.session_mgr.daemon_request # Hijack the connection so we can log messages self._receive_message_orig = self.connection.receive_message self.connection.receive_message = self.receive_message @@ -630,7 +689,6 @@ class ElectrumX(SessionBase): self.sv_seen = False self.mempool_statuses = {} self.set_request_handlers(self.PROTOCOL_MIN) - self.db_height = self.chain_state.db_height @classmethod def protocol_min_max_strings(cls): @@ -662,13 +720,6 @@ class ElectrumX(SessionBase): def protocol_version_string(self): return util.version_string(self.protocol_tuple) - async def daemon_request(self, method, *args): - '''Catch a DaemonError and convert it to an RPCError.''' - try: - return await self.chain_state.daemon_request(method, args) - except DaemonError as e: - raise RPCError(DAEMON_ERROR, f'daemon error: {e!r}') from None - def sub_count(self): return len(self.hashX_subs) @@ -729,7 +780,7 @@ class ElectrumX(SessionBase): async def raw_header(self, height): '''Return the binary header at the given height.''' try: - return await self.chain_state.raw_header(height) + return await self.db.raw_header(height) except IndexError: raise RPCError(BAD_REQUEST, f'height {height:,d} ' 'out of range') from None @@ -750,7 +801,7 @@ class ElectrumX(SessionBase): '''Subscribe to get headers of new blocks.''' self.subscribe_headers = True self.subscribe_headers_raw = assert_boolean(raw) - self.notified_height = self.db_height() + self.notified_height = self.db.db_height return await self.subscribe_headers_result(self.notified_height) async def headers_subscribe(self): @@ -804,7 +855,7 @@ class ElectrumX(SessionBase): async def hashX_listunspent(self, hashX): '''Return the list of UTXOs of a script hash, including mempool effects.''' - utxos = await self.chain_state.all_utxos(hashX) + utxos = await self.db.all_utxos(hashX) utxos = sorted(utxos) utxos.extend(await self.mempool.unordered_UTXOs(hashX)) spends = await self.mempool.potential_spends(hashX) @@ -861,7 +912,7 @@ class ElectrumX(SessionBase): return await self.hashX_subscribe(hashX, address) async def get_balance(self, hashX): - utxos = await self.chain_state.all_utxos(hashX) + utxos = await self.db.all_utxos(hashX) confirmed = sum(utxo.value for utxo in utxos) unconfirmed = await self.mempool.balance_delta(hashX) return {'confirmed': confirmed, 'unconfirmed': unconfirmed} @@ -909,14 +960,14 @@ class ElectrumX(SessionBase): return await self.hashX_subscribe(hashX, scripthash) async def _merkle_proof(self, cp_height, height): - max_height = self.db_height() + max_height = self.db.db_height if not height <= cp_height <= max_height: raise RPCError(BAD_REQUEST, f'require header height {height:,d} <= ' f'cp_height {cp_height:,d} <= ' f'chain height {max_height:,d}') - branch, root = await self.chain_state.header_branch_and_root( - cp_height + 1, height) + branch, root = await self.db.header_branch_and_root(cp_height + 1, + height) return { 'branch': [hash_to_hex_str(elt) for elt in branch], 'root': hash_to_hex_str(root), @@ -953,8 +1004,7 @@ class ElectrumX(SessionBase): max_size = self.MAX_CHUNK_SIZE count = min(count, max_size) - headers, count = await self.chain_state.read_headers(start_height, - count) + headers, count = await self.db.read_headers(start_height, count) result = {'hex': headers.hex(), 'count': count, 'max': max_size} if count and cp_height: last_height = start_height + count - 1 @@ -971,7 +1021,7 @@ class ElectrumX(SessionBase): index = non_negative_integer(index) size = self.coin.CHUNK_SIZE start_height = index * size - headers, _ = await self.chain_state.read_headers(start_height, size) + headers, _ = await self.db.read_headers(start_height, size) return headers.hex() async def block_get_header(self, height): @@ -1091,11 +1141,10 @@ class ElectrumX(SessionBase): raw_tx: the raw transaction as a hexadecimal string''' # This returns errors as JSON RPC errors, as is natural try: - tx_hash = await self.chain_state.broadcast_transaction(raw_tx) + hex_hash = await self.session_mgr.broadcast_transaction(raw_tx) self.txs_sent += 1 - self.session_mgr.txs_sent += 1 - self.logger.info('sent tx: {}'.format(tx_hash)) - return tx_hash + self.logger.info(f'sent tx: {hex_hash}') + return hex_hash except DaemonError as e: error, = e.args message = error['message'] @@ -1135,7 +1184,7 @@ class ElectrumX(SessionBase): tx_pos: index of transaction in tx_hashes to create branch for ''' hashes = [hex_str_to_hash(hash) for hash in tx_hashes] - branch, root = self.chain_state.tx_branch_and_root(hashes, tx_pos) + branch, root = self.db.merkle.branch_and_root(hashes, tx_pos) branch = [hash_to_hex_str(hash) for hash in branch] return branch @@ -1358,7 +1407,7 @@ class DashElectrumX(ElectrumX): # with the masternode information including the payment # position is returned. cache = self.session_mgr.mn_cache - if not cache or self.session_mgr.mn_cache_height != self.db_height(): + if not cache or self.session_mgr.mn_cache_height != self.db.db_height: full_mn_list = await self.daemon_request('masternode_list', ['full']) mn_payment_queue = get_masternode_payment_queue(full_mn_list) @@ -1386,7 +1435,7 @@ class DashElectrumX(ElectrumX): mn_list.append(mn_info) cache.clear() cache.extend(mn_list) - self.session_mgr.mn_cache_height = self.db_height() + self.session_mgr.mn_cache_height = self.db.db_height # If payees is an empty list the whole masternode list is returned if payees: