Get rid of chain state

This commit is contained in:
Neil Booth 2018-08-09 15:22:17 +09:00
parent c69a740dda
commit f0f5aa3ee7
5 changed files with 106 additions and 154 deletions

View File

@ -1,102 +0,0 @@
# Copyright (c) 2016-2018, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
from electrumx.lib.hash import hash_to_hex_str
class ChainState(object):
'''Used as an interface by servers to request information about
blocks, transaction history, UTXOs and the mempool.
'''
def __init__(self, env, db, daemon, bp):
self._env = env
self._db = db
self._daemon = daemon
# External interface pass-throughs for session.py
self.force_chain_reorg = bp.force_chain_reorg
self.tx_branch_and_root = db.merkle.branch_and_root
self.read_headers = db.read_headers
self.all_utxos = db.all_utxos
self.limited_history = db.limited_history
self.header_branch_and_root = db.header_branch_and_root
async def broadcast_transaction(self, raw_tx):
return await self._daemon.sendrawtransaction([raw_tx])
async def daemon_request(self, method, args=()):
return await getattr(self._daemon, method)(*args)
def db_height(self):
return self._db.db_height
def get_info(self):
'''Chain state info for LocalRPC and logs.'''
return {
'daemon': self._daemon.logged_url(),
'daemon_height': self._daemon.cached_height(),
'db_height': self.db_height(),
}
async def raw_header(self, height):
'''Return the binary header at the given height.'''
header, n = await self.read_headers(height, 1)
if n != 1:
raise IndexError(f'height {height:,d} out of range')
return header
def set_daemon_url(self, daemon_url):
self._daemon.set_urls(self._env.coin.daemon_urls(daemon_url))
return self._daemon.logged_url()
async def query(self, args, limit):
coin = self._env.coin
db = self._db
lines = []
def arg_to_hashX(arg):
try:
script = bytes.fromhex(arg)
lines.append(f'Script: {arg}')
return coin.hashX_from_script(script)
except ValueError:
pass
hashX = coin.address_to_hashX(arg)
lines.append(f'Address: {arg}')
return hashX
for arg in args:
hashX = arg_to_hashX(arg)
if not hashX:
continue
n = None
history = await db.limited_history(hashX, limit=limit)
for n, (tx_hash, height) in enumerate(history):
lines.append(f'History #{n:,d}: height {height:,d} '
f'tx_hash {hash_to_hex_str(tx_hash)}')
if n is None:
lines.append('No history found')
n = None
utxos = await db.all_utxos(hashX)
for n, utxo in enumerate(utxos, start=1):
lines.append(f'UTXO #{n:,d}: tx_hash '
f'{hash_to_hex_str(utxo.tx_hash)} '
f'tx_pos {utxo.tx_pos:,d} height '
f'{utxo.height:,d} value {utxo.value:,d}')
if n == limit:
break
if n is None:
lines.append('No UTXOs found')
balance = sum(utxo.value for utxo in utxos)
lines.append(f'Balance: {coin.decimal_value(balance):,f} '
f'{coin.SHORTNAME}')
return lines

View File

@ -12,7 +12,6 @@ from aiorpcx import _version as aiorpcx_version, TaskGroup
import electrumx
from electrumx.lib.server_base import ServerBase
from electrumx.lib.util import version_string
from electrumx.server.chain_state import ChainState
from electrumx.server.db import DB
from electrumx.server.mempool import MemPool, MemPoolAPI
from electrumx.server.session import SessionManager
@ -97,7 +96,6 @@ class Controller(ServerBase):
db = DB(env)
BlockProcessor = env.coin.BLOCK_PROCESSOR
bp = BlockProcessor(env, db, daemon, notifications)
chain_state = ChainState(env, db, daemon, bp)
# Set ourselves up to implement the MemPoolAPI
self.height = daemon.height
@ -109,7 +107,7 @@ class Controller(ServerBase):
MemPoolAPI.register(Controller)
mempool = MemPool(env.coin, self)
session_mgr = SessionManager(env, chain_state, mempool,
session_mgr = SessionManager(env, db, bp, daemon, mempool,
notifications, shutdown_event)
caught_up_event = Event()

View File

@ -370,6 +370,13 @@ class DB(object):
# Truncate header_mc: header count is 1 more than the height.
self.header_mc.truncate(height + 1)
async def raw_header(self, height):
'''Return the binary header at the given height.'''
header, n = await self.read_headers(height, 1)
if n != 1:
raise IndexError(f'height {height:,d} out of range')
return header
async def read_headers(self, start_height, count):
'''Requires start_height >= 0, count >= 0. Reads as many headers as
are available starting at start_height up to count. This

View File

@ -55,12 +55,12 @@ class PeerManager(object):
Attempts to maintain a connection with up to 8 peers.
Issues a 'peers.subscribe' RPC to them and tells them our data.
'''
def __init__(self, env, chain_state):
def __init__(self, env, db):
self.logger = class_logger(__name__, self.__class__.__name__)
# Initialise the Peer class
Peer.DEFAULT_PORTS = env.coin.PEER_DEFAULT_PORTS
self.env = env
self.chain_state = chain_state
self.db = db
# Our clearnet and Tor Peers, if any
sclass = env.coin.SESSIONCLS
@ -300,7 +300,7 @@ class PeerManager(object):
result = await session.send_request(message)
assert_good(message, result, dict)
our_height = self.chain_state.db_height()
our_height = self.db.db_height
if ptuple < (1, 3):
their_height = result.get('block_height')
else:
@ -313,7 +313,7 @@ class PeerManager(object):
# Check prior header too in case of hard fork.
check_height = min(our_height, their_height)
raw_header = await self.chain_state.raw_header(check_height)
raw_header = await self.db.raw_header(check_height)
if ptuple >= (1, 4):
ours = raw_header.hex()
message = 'blockchain.block.header'

View File

@ -109,13 +109,15 @@ class SessionManager(object):
CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4)
def __init__(self, env, chain_state, mempool, notifications,
def __init__(self, env, db, bp, daemon, mempool, notifications,
shutdown_event):
env.max_send = max(350000, env.max_send)
self.env = env
self.chain_state = chain_state
self.db = db
self.bp = bp
self.daemon = daemon
self.mempool = mempool
self.peer_mgr = PeerManager(env, chain_state)
self.peer_mgr = PeerManager(env, db)
self.shutdown_event = shutdown_event
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.servers = {}
@ -152,7 +154,7 @@ class SessionManager(object):
protocol_class = LocalRPC
else:
protocol_class = self.env.coin.SESSIONCLS
protocol_factory = partial(protocol_class, self, self.chain_state,
protocol_factory = partial(protocol_class, self, self.db,
self.mempool, self.peer_mgr, kind)
server = loop.create_server(protocol_factory, *args, **kw_args)
@ -276,10 +278,11 @@ class SessionManager(object):
def _get_info(self):
'''A summary of server state.'''
group_map = self._group_map()
result = self.chain_state.get_info()
result.update({
'version': electrumx.version,
return {
'closing': len([s for s in self.sessions if s.is_closing()]),
'daemon': self.daemon.logged_url(),
'daemon_height': self.daemon.cached_height(),
'db_height': self.db.db_height,
'errors': sum(s.errors for s in self.sessions),
'groups': len(group_map),
'logged': len([s for s in self.sessions if s.log_me]),
@ -291,8 +294,8 @@ class SessionManager(object):
'subs': self._sub_count(),
'txs_sent': self.txs_sent,
'uptime': util.formatted_time(time.time() - self.start_time),
})
return result
'version': electrumx.version,
}
def _session_data(self, for_log):
'''Returned to the RPC 'sessions' call.'''
@ -367,10 +370,10 @@ class SessionManager(object):
'''Replace the daemon URL.'''
daemon_url = daemon_url or self.env.daemon_url
try:
daemon_url = self.chain_state.set_daemon_url(daemon_url)
self.daemon.set_urls(self.env.coin.daemon_urls(daemon_url))
except Exception as e:
raise RPCError(BAD_REQUEST, f'an error occured: {e!r}')
return f'now using daemon at {daemon_url}'
return f'now using daemon at {self.daemon.logged_url()}'
async def rpc_stop(self):
'''Shut down the server cleanly.'''
@ -391,10 +394,54 @@ class SessionManager(object):
async def rpc_query(self, items, limit):
'''Return a list of data about server peers.'''
try:
return await self.chain_state.query(items, limit)
except Base58Error as e:
raise RPCError(BAD_REQUEST, e.args[0]) from None
coin = self.env.coin
db = self.db
lines = []
def arg_to_hashX(arg):
try:
script = bytes.fromhex(arg)
lines.append(f'Script: {arg}')
return coin.hashX_from_script(script)
except ValueError:
pass
try:
hashX = coin.address_to_hashX(arg)
except Base58Error as e:
lines.append(e.args[0])
return None
lines.append(f'Address: {arg}')
return hashX
for arg in args:
hashX = arg_to_hashX(arg)
if not hashX:
continue
n = None
history = await db.limited_history(hashX, limit=limit)
for n, (tx_hash, height) in enumerate(history):
lines.append(f'History #{n:,d}: height {height:,d} '
f'tx_hash {hash_to_hex_str(tx_hash)}')
if n is None:
lines.append('No history found')
n = None
utxos = await db.all_utxos(hashX)
for n, utxo in enumerate(utxos, start=1):
lines.append(f'UTXO #{n:,d}: tx_hash '
f'{hash_to_hex_str(utxo.tx_hash)} '
f'tx_pos {utxo.tx_pos:,d} height '
f'{utxo.height:,d} value {utxo.value:,d}')
if n == limit:
break
if n is None:
lines.append('No UTXOs found')
balance = sum(utxo.value for utxo in utxos)
lines.append(f'Balance: {coin.decimal_value(balance):,f} '
f'{coin.SHORTNAME}')
return lines
async def rpc_sessions(self):
'''Return statistics about connected sessions.'''
@ -406,7 +453,7 @@ class SessionManager(object):
count: number of blocks to reorg
'''
count = non_negative_integer(count)
if not self.chain_state.force_chain_reorg(count):
if not self.bp.force_chain_reorg(count):
raise RPCError(BAD_REQUEST, 'still catching up with daemon')
return f'scheduled a reorg of {count:,d} blocks'
@ -454,6 +501,18 @@ class SessionManager(object):
'''The number of connections that we've sent something to.'''
return len(self.sessions)
async def daemon_request(self, method, *args):
'''Catch a DaemonError and convert it to an RPCError.'''
try:
return await getattr(self.daemon, method)(*args)
except DaemonError as e:
raise RPCError(DAEMON_ERROR, f'daemon error: {e!r}') from None
async def broadcast_transaction(self, raw_tx):
hex_hash = await self.daemon.sendrawtransaction([raw_tx])
self.txs_sent += 1
return hex_hash
async def limited_history(self, hashX):
'''A caching layer.'''
hc = self._history_cache
@ -463,8 +522,7 @@ class SessionManager(object):
# on bloated history requests, and uses a smaller divisor
# so large requests are logged before refusing them.
limit = self.env.max_send // 97
hc[hashX] = await self.chain_state.limited_history(hashX,
limit=limit)
hc[hashX] = await self.db.limited_history(hashX, limit=limit)
return hc[hashX]
async def _notify_sessions(self, height, touched):
@ -518,12 +576,12 @@ class SessionBase(ServerSession):
MAX_CHUNK_SIZE = 2016
session_counter = itertools.count()
def __init__(self, session_mgr, chain_state, mempool, peer_mgr, kind):
def __init__(self, session_mgr, db, mempool, peer_mgr, kind):
connection = JSONRPCConnection(JSONRPCAutoDetect)
super().__init__(connection=connection)
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.session_mgr = session_mgr
self.chain_state = chain_state
self.db = db
self.mempool = mempool
self.peer_mgr = peer_mgr
self.kind = kind # 'RPC', 'TCP' etc.
@ -534,6 +592,7 @@ class SessionBase(ServerSession):
self.txs_sent = 0
self.log_me = False
self.bw_limit = self.env.bandwidth_limit
self.daemon_request = self.session_mgr.daemon_request
# Hijack the connection so we can log messages
self._receive_message_orig = self.connection.receive_message
self.connection.receive_message = self.receive_message
@ -630,7 +689,6 @@ class ElectrumX(SessionBase):
self.sv_seen = False
self.mempool_statuses = {}
self.set_request_handlers(self.PROTOCOL_MIN)
self.db_height = self.chain_state.db_height
@classmethod
def protocol_min_max_strings(cls):
@ -662,13 +720,6 @@ class ElectrumX(SessionBase):
def protocol_version_string(self):
return util.version_string(self.protocol_tuple)
async def daemon_request(self, method, *args):
'''Catch a DaemonError and convert it to an RPCError.'''
try:
return await self.chain_state.daemon_request(method, args)
except DaemonError as e:
raise RPCError(DAEMON_ERROR, f'daemon error: {e!r}') from None
def sub_count(self):
return len(self.hashX_subs)
@ -729,7 +780,7 @@ class ElectrumX(SessionBase):
async def raw_header(self, height):
'''Return the binary header at the given height.'''
try:
return await self.chain_state.raw_header(height)
return await self.db.raw_header(height)
except IndexError:
raise RPCError(BAD_REQUEST, f'height {height:,d} '
'out of range') from None
@ -750,7 +801,7 @@ class ElectrumX(SessionBase):
'''Subscribe to get headers of new blocks.'''
self.subscribe_headers = True
self.subscribe_headers_raw = assert_boolean(raw)
self.notified_height = self.db_height()
self.notified_height = self.db.db_height
return await self.subscribe_headers_result(self.notified_height)
async def headers_subscribe(self):
@ -804,7 +855,7 @@ class ElectrumX(SessionBase):
async def hashX_listunspent(self, hashX):
'''Return the list of UTXOs of a script hash, including mempool
effects.'''
utxos = await self.chain_state.all_utxos(hashX)
utxos = await self.db.all_utxos(hashX)
utxos = sorted(utxos)
utxos.extend(await self.mempool.unordered_UTXOs(hashX))
spends = await self.mempool.potential_spends(hashX)
@ -861,7 +912,7 @@ class ElectrumX(SessionBase):
return await self.hashX_subscribe(hashX, address)
async def get_balance(self, hashX):
utxos = await self.chain_state.all_utxos(hashX)
utxos = await self.db.all_utxos(hashX)
confirmed = sum(utxo.value for utxo in utxos)
unconfirmed = await self.mempool.balance_delta(hashX)
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
@ -909,14 +960,14 @@ class ElectrumX(SessionBase):
return await self.hashX_subscribe(hashX, scripthash)
async def _merkle_proof(self, cp_height, height):
max_height = self.db_height()
max_height = self.db.db_height
if not height <= cp_height <= max_height:
raise RPCError(BAD_REQUEST,
f'require header height {height:,d} <= '
f'cp_height {cp_height:,d} <= '
f'chain height {max_height:,d}')
branch, root = await self.chain_state.header_branch_and_root(
cp_height + 1, height)
branch, root = await self.db.header_branch_and_root(cp_height + 1,
height)
return {
'branch': [hash_to_hex_str(elt) for elt in branch],
'root': hash_to_hex_str(root),
@ -953,8 +1004,7 @@ class ElectrumX(SessionBase):
max_size = self.MAX_CHUNK_SIZE
count = min(count, max_size)
headers, count = await self.chain_state.read_headers(start_height,
count)
headers, count = await self.db.read_headers(start_height, count)
result = {'hex': headers.hex(), 'count': count, 'max': max_size}
if count and cp_height:
last_height = start_height + count - 1
@ -971,7 +1021,7 @@ class ElectrumX(SessionBase):
index = non_negative_integer(index)
size = self.coin.CHUNK_SIZE
start_height = index * size
headers, _ = await self.chain_state.read_headers(start_height, size)
headers, _ = await self.db.read_headers(start_height, size)
return headers.hex()
async def block_get_header(self, height):
@ -1091,11 +1141,10 @@ class ElectrumX(SessionBase):
raw_tx: the raw transaction as a hexadecimal string'''
# This returns errors as JSON RPC errors, as is natural
try:
tx_hash = await self.chain_state.broadcast_transaction(raw_tx)
hex_hash = await self.session_mgr.broadcast_transaction(raw_tx)
self.txs_sent += 1
self.session_mgr.txs_sent += 1
self.logger.info('sent tx: {}'.format(tx_hash))
return tx_hash
self.logger.info(f'sent tx: {hex_hash}')
return hex_hash
except DaemonError as e:
error, = e.args
message = error['message']
@ -1135,7 +1184,7 @@ class ElectrumX(SessionBase):
tx_pos: index of transaction in tx_hashes to create branch for
'''
hashes = [hex_str_to_hash(hash) for hash in tx_hashes]
branch, root = self.chain_state.tx_branch_and_root(hashes, tx_pos)
branch, root = self.db.merkle.branch_and_root(hashes, tx_pos)
branch = [hash_to_hex_str(hash) for hash in branch]
return branch
@ -1358,7 +1407,7 @@ class DashElectrumX(ElectrumX):
# with the masternode information including the payment
# position is returned.
cache = self.session_mgr.mn_cache
if not cache or self.session_mgr.mn_cache_height != self.db_height():
if not cache or self.session_mgr.mn_cache_height != self.db.db_height:
full_mn_list = await self.daemon_request('masternode_list',
['full'])
mn_payment_queue = get_masternode_payment_queue(full_mn_list)
@ -1386,7 +1435,7 @@ class DashElectrumX(ElectrumX):
mn_list.append(mn_info)
cache.clear()
cache.extend(mn_list)
self.session_mgr.mn_cache_height = self.db_height()
self.session_mgr.mn_cache_height = self.db.db_height
# If payees is an empty list the whole masternode list is returned
if payees: