Introduce ChainState object
- reduces the tangle of object dependencies - rationalizes responsibilities
This commit is contained in:
parent
53425ce585
commit
391e69b66c
112
electrumx/server/chain_state.py
Normal file
112
electrumx/server/chain_state.py
Normal file
@ -0,0 +1,112 @@
|
||||
# Copyright (c) 2016-2018, Neil Booth
|
||||
#
|
||||
# All rights reserved.
|
||||
#
|
||||
# See the file "LICENCE" for information about the copyright
|
||||
# and warranty status of this software.
|
||||
|
||||
|
||||
import pylru
|
||||
|
||||
from electrumx.server.mempool import MemPool
|
||||
|
||||
|
||||
class ChainState(object):
|
||||
'''Used as an interface by servers to request information about
|
||||
blocks, transaction history, UTXOs and the mempool.
|
||||
'''
|
||||
|
||||
def __init__(self, env, tasks, shutdown_event):
|
||||
self.env = env
|
||||
self.tasks = tasks
|
||||
self.shutdown_event = shutdown_event
|
||||
self.daemon = env.coin.DAEMON(env)
|
||||
self.bp = env.coin.BLOCK_PROCESSOR(env, tasks, self.daemon)
|
||||
self.mempool = MemPool(env.coin, self, self.tasks,
|
||||
self.bp.add_new_block_callback)
|
||||
self.history_cache = pylru.lrucache(256)
|
||||
# External interface: pass-throughs for mempool.py
|
||||
self.cached_mempool_hashes = self.daemon.cached_mempool_hashes
|
||||
self.mempool_refresh_event = self.daemon.mempool_refresh_event
|
||||
self.getrawtransactions = self.daemon.getrawtransactions
|
||||
self.utxo_lookup = self.bp.db_utxo_lookup
|
||||
# External interface pass-throughs for session.py
|
||||
self.force_chain_reorg = self.bp.force_chain_reorg
|
||||
self.mempool_fee_histogram = self.mempool.get_fee_histogram
|
||||
self.mempool_get_utxos = self.mempool.get_utxos
|
||||
self.mempool_potential_spends = self.mempool.potential_spends
|
||||
self.mempool_transactions = self.mempool.transactions
|
||||
self.mempool_value = self.mempool.value
|
||||
self.tx_branch_and_root = self.bp.merkle.branch_and_root
|
||||
self.read_headers = self.bp.read_headers
|
||||
|
||||
async def broadcast_transaction(self, raw_tx):
|
||||
return await self.daemon.sendrawtransaction([raw_tx])
|
||||
|
||||
async def daemon_request(self, method, args):
|
||||
return await getattr(self.daemon, method)(*args)
|
||||
|
||||
def db_height(self):
|
||||
return self.bp.db_height
|
||||
|
||||
def get_info(self):
|
||||
'''Chain state info for LocalRPC and logs.'''
|
||||
return {
|
||||
'daemon': self.daemon.logged_url(),
|
||||
'daemon_height': self.daemon.cached_height(),
|
||||
'db_height': self.db_height(),
|
||||
}
|
||||
|
||||
async def get_history(self, hashX):
|
||||
'''Get history asynchronously to reduce latency.'''
|
||||
def job():
|
||||
# History DoS limit. Each element of history is about 99
|
||||
# bytes when encoded as JSON. This limits resource usage
|
||||
# on bloated history requests, and uses a smaller divisor
|
||||
# so large requests are logged before refusing them.
|
||||
limit = self.env.max_send // 97
|
||||
return list(self.bp.get_history(hashX, limit=limit))
|
||||
|
||||
hc = self.history_cache
|
||||
if hashX not in hc:
|
||||
hc[hashX] = await self.tasks.run_in_thread(job)
|
||||
return hc[hashX]
|
||||
|
||||
async def get_utxos(self, hashX):
|
||||
'''Get UTXOs asynchronously to reduce latency.'''
|
||||
def job():
|
||||
return list(self.bp.get_utxos(hashX, limit=None))
|
||||
|
||||
return await self.tasks.run_in_thread(job)
|
||||
|
||||
def header_branch_and_root(self, length, height):
|
||||
return self.bp.header_mc.branch_and_root(length, height)
|
||||
|
||||
def invalidate_history_cache(self, touched):
|
||||
hc = self.history_cache
|
||||
for hashX in set(hc).intersection(touched):
|
||||
del hc[hashX]
|
||||
|
||||
def processing_new_block(self):
|
||||
'''Return True if we're processing a new block.'''
|
||||
return self.daemon.cached_height() > self.db_height()
|
||||
|
||||
def raw_header(self, height):
|
||||
'''Return the binary header at the given height.'''
|
||||
header, n = self.bp.read_headers(height, 1)
|
||||
if n != 1:
|
||||
raise IndexError(f'height {height:,d} out of range')
|
||||
return header
|
||||
|
||||
def set_daemon_url(self, daemon_url):
|
||||
self.daemon.set_urls(self.env.coin.daemon_urls(daemon_url))
|
||||
return self.daemon.logged_url()
|
||||
|
||||
def shutdown(self):
|
||||
self.tasks.loop.call_soon(self.shutdown_event.set)
|
||||
|
||||
async def wait_for_mempool(self):
|
||||
self.tasks.create_task(self.bp.main_loop())
|
||||
await self.bp.caught_up_event.wait()
|
||||
self.tasks.create_task(self.mempool.main_loop())
|
||||
await self.mempool.synchronized_event.wait()
|
||||
@ -10,16 +10,16 @@ import electrumx
|
||||
from electrumx.lib.server_base import ServerBase
|
||||
from electrumx.lib.tasks import Tasks
|
||||
from electrumx.lib.util import version_string
|
||||
from electrumx.server.mempool import MemPool
|
||||
from electrumx.server.chain_state import ChainState
|
||||
from electrumx.server.peers import PeerManager
|
||||
from electrumx.server.session import SessionManager
|
||||
|
||||
|
||||
class Controller(ServerBase):
|
||||
'''Manages the client servers, a mempool, and a block processor.
|
||||
'''Manages server initialisation and stutdown.
|
||||
|
||||
Servers are started immediately the block processor first catches
|
||||
up with the daemon.
|
||||
Servers are started once the mempool is synced after the block
|
||||
processor first catches up with the daemon.
|
||||
'''
|
||||
|
||||
AIORPCX_MIN = (0, 5, 6)
|
||||
@ -41,20 +41,20 @@ class Controller(ServerBase):
|
||||
env.max_send = max(350000, env.max_send)
|
||||
|
||||
self.tasks = Tasks()
|
||||
self.session_mgr = SessionManager(env, self.tasks, self)
|
||||
self.daemon = env.coin.DAEMON(env)
|
||||
self.bp = env.coin.BLOCK_PROCESSOR(env, self.tasks, self.daemon)
|
||||
self.mempool = MemPool(self.bp, self.daemon, self.tasks,
|
||||
self.session_mgr.notify_sessions)
|
||||
self.peer_mgr = PeerManager(env, self.tasks, self.session_mgr, self.bp)
|
||||
self.chain_state = ChainState(env, self.tasks, self.shutdown_event)
|
||||
self.peer_mgr = PeerManager(env, self.tasks, self.chain_state)
|
||||
self.session_mgr = SessionManager(env, self.tasks, self.chain_state,
|
||||
self.peer_mgr)
|
||||
|
||||
async def start_servers(self):
|
||||
'''Start the RPC server and schedule the external servers to be
|
||||
started once the block processor has caught up.
|
||||
'''Start the RPC server and wait for the mempool to synchronize. Then
|
||||
start the peer manager and serving external clients.
|
||||
'''
|
||||
await self.session_mgr.start_rpc_server()
|
||||
self.tasks.create_task(self.bp.main_loop())
|
||||
self.tasks.create_task(self.wait_for_bp_catchup())
|
||||
await self.chain_state.wait_for_mempool()
|
||||
self.tasks.create_task(self.peer_mgr.main_loop())
|
||||
self.tasks.create_task(self.session_mgr.start_serving())
|
||||
self.tasks.create_task(self.session_mgr.housekeeping())
|
||||
|
||||
async def shutdown(self):
|
||||
'''Perform the shutdown sequence.'''
|
||||
@ -63,29 +63,4 @@ class Controller(ServerBase):
|
||||
await self.session_mgr.shutdown()
|
||||
await self.tasks.wait()
|
||||
# Finally shut down the block processor and executor (FIXME)
|
||||
self.bp.shutdown(self.tasks.executor)
|
||||
|
||||
async def mempool_transactions(self, hashX):
|
||||
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
|
||||
entries for the hashX.
|
||||
|
||||
unconfirmed is True if any txin is unconfirmed.
|
||||
'''
|
||||
return await self.mempool.transactions(hashX)
|
||||
|
||||
def mempool_value(self, hashX):
|
||||
'''Return the unconfirmed amount in the mempool for hashX.
|
||||
|
||||
Can be positive or negative.
|
||||
'''
|
||||
return self.mempool.value(hashX)
|
||||
|
||||
async def wait_for_bp_catchup(self):
|
||||
'''Wait for the block processor to catch up, and for the mempool to
|
||||
synchronize, then kick off server background processes.'''
|
||||
await self.bp.caught_up_event.wait()
|
||||
self.tasks.create_task(self.mempool.main_loop())
|
||||
await self.mempool.synchronized_event.wait()
|
||||
self.tasks.create_task(self.peer_mgr.main_loop())
|
||||
self.tasks.create_task(self.session_mgr.start_serving())
|
||||
self.tasks.create_task(self.session_mgr.housekeeping())
|
||||
self.chain_state.bp.shutdown(self.tasks.executor)
|
||||
|
||||
@ -15,7 +15,7 @@ from collections import defaultdict
|
||||
from electrumx.lib.hash import hash_to_hex_str, hex_str_to_hash
|
||||
from electrumx.lib.util import class_logger
|
||||
from electrumx.server.daemon import DaemonError
|
||||
from electrumx.server.db import UTXO
|
||||
from electrumx.server.db import UTXO, DB
|
||||
|
||||
|
||||
class MemPool(object):
|
||||
@ -32,13 +32,11 @@ class MemPool(object):
|
||||
A pair is a (hashX, value) tuple. tx hashes are hex strings.
|
||||
'''
|
||||
|
||||
def __init__(self, db, daemon, tasks, notify_sessions):
|
||||
def __init__(self, coin, chain_state, tasks, add_new_block_callback):
|
||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||
self.db = db
|
||||
self.daemon = daemon
|
||||
self.coin = coin
|
||||
self.chain_state = chain_state
|
||||
self.tasks = tasks
|
||||
self.notify_sessions = notify_sessions
|
||||
self.coin = db.coin
|
||||
self.touched = set()
|
||||
self.stop = False
|
||||
self.txs = {}
|
||||
@ -47,7 +45,7 @@ class MemPool(object):
|
||||
self.fee_histogram = defaultdict(int)
|
||||
self.compact_fee_histogram = []
|
||||
self.histogram_time = 0
|
||||
db.add_new_block_callback(self.on_new_block)
|
||||
add_new_block_callback(self.on_new_block)
|
||||
|
||||
def _resync_daemon_hashes(self, unprocessed, unfetched):
|
||||
'''Re-sync self.txs with the list of hashes in the daemon's mempool.
|
||||
@ -60,7 +58,7 @@ class MemPool(object):
|
||||
touched = self.touched
|
||||
fee_hist = self.fee_histogram
|
||||
|
||||
hashes = self.daemon.cached_mempool_hashes()
|
||||
hashes = self.chain_state.cached_mempool_hashes()
|
||||
gone = set(txs).difference(hashes)
|
||||
for hex_hash in gone:
|
||||
unfetched.discard(hex_hash)
|
||||
@ -99,13 +97,13 @@ class MemPool(object):
|
||||
|
||||
self.logger.info('beginning processing of daemon mempool. '
|
||||
'This can take some time...')
|
||||
await self.daemon.mempool_refresh_event.wait()
|
||||
await self.chain_state.mempool_refresh_event.wait()
|
||||
next_log = 0
|
||||
loops = -1 # Zero during initial catchup
|
||||
|
||||
while True:
|
||||
# Avoid double notifications if processing a block
|
||||
if self.touched and not self.processing_new_block():
|
||||
if self.touched and not self.chain_state.processing_new_block():
|
||||
self.notify_sessions(self.touched)
|
||||
self.touched.clear()
|
||||
|
||||
@ -127,10 +125,10 @@ class MemPool(object):
|
||||
|
||||
try:
|
||||
if not todo:
|
||||
await self.daemon.mempool_refresh_event.wait()
|
||||
await self.chain_state.mempool_refresh_event.wait()
|
||||
|
||||
self._resync_daemon_hashes(unprocessed, unfetched)
|
||||
self.daemon.mempool_refresh_event.clear()
|
||||
self.chain_state.mempool_refresh_event.clear()
|
||||
|
||||
if unfetched:
|
||||
count = min(len(unfetched), fetch_size)
|
||||
@ -196,13 +194,9 @@ class MemPool(object):
|
||||
self.touched.clear()
|
||||
self.notify_sessions(touched)
|
||||
|
||||
def processing_new_block(self):
|
||||
'''Return True if we're processing a new block.'''
|
||||
return self.daemon.cached_height() > self.db.db_height
|
||||
|
||||
async def fetch_raw_txs(self, hex_hashes):
|
||||
'''Fetch a list of mempool transactions.'''
|
||||
raw_txs = await self.daemon.getrawtransactions(hex_hashes)
|
||||
raw_txs = await self.chain_state.getrawtransactions(hex_hashes)
|
||||
|
||||
# Skip hashes the daemon has dropped. Either they were
|
||||
# evicted or they got in a block.
|
||||
@ -218,7 +212,6 @@ class MemPool(object):
|
||||
'''
|
||||
script_hashX = self.coin.hashX_from_script
|
||||
deserializer = self.coin.DESERIALIZER
|
||||
db_utxo_lookup = self.db.db_utxo_lookup
|
||||
txs = self.txs
|
||||
|
||||
# Deserialize each tx and put it in a pending list
|
||||
@ -240,6 +233,7 @@ class MemPool(object):
|
||||
# Now process what we can
|
||||
result = {}
|
||||
deferred = []
|
||||
utxo_lookup = self.chain_state.utxo_lookup
|
||||
|
||||
for item in pending:
|
||||
if self.stop:
|
||||
@ -264,8 +258,8 @@ class MemPool(object):
|
||||
txin_pairs.append(tx_info[1][prev_idx])
|
||||
elif not mempool_missing:
|
||||
prev_hash = hex_str_to_hash(prev_hex_hash)
|
||||
txin_pairs.append(db_utxo_lookup(prev_hash, prev_idx))
|
||||
except (self.db.MissingUTXOError, self.db.DBError):
|
||||
txin_pairs.append(utxo_lookup(prev_hash, prev_idx))
|
||||
except (DB.MissingUTXOError, DB.DBError):
|
||||
# DBError can happen when flushing a newly processed
|
||||
# block. MissingUTXOError typically happens just
|
||||
# after the daemon has accepted a new block and the
|
||||
@ -293,7 +287,7 @@ class MemPool(object):
|
||||
return []
|
||||
|
||||
hex_hashes = self.hashXs[hashX]
|
||||
raw_txs = await self.daemon.getrawtransactions(hex_hashes)
|
||||
raw_txs = await self.chain_state.getrawtransactions(hex_hashes)
|
||||
return zip(hex_hashes, raw_txs)
|
||||
|
||||
async def transactions(self, hashX):
|
||||
|
||||
@ -141,7 +141,7 @@ class PeerSession(ClientSession):
|
||||
return
|
||||
|
||||
result = request.result()
|
||||
our_height = self.peer_mgr.bp.db_height
|
||||
our_height = self.peer_mgr.chain_state.db_height()
|
||||
if self.ptuple < (1, 3):
|
||||
their_height = result.get('block_height')
|
||||
else:
|
||||
@ -155,7 +155,7 @@ class PeerSession(ClientSession):
|
||||
return
|
||||
# Check prior header too in case of hard fork.
|
||||
check_height = min(our_height, their_height)
|
||||
raw_header = self.peer_mgr.session_mgr.raw_header(check_height)
|
||||
raw_header = self.peer_mgr.chain_state.raw_header(check_height)
|
||||
if self.ptuple >= (1, 4):
|
||||
self.send_request('blockchain.block.header', [check_height],
|
||||
partial(self.on_header, raw_header.hex()),
|
||||
@ -240,14 +240,13 @@ class PeerManager(object):
|
||||
Attempts to maintain a connection with up to 8 peers.
|
||||
Issues a 'peers.subscribe' RPC to them and tells them our data.
|
||||
'''
|
||||
def __init__(self, env, tasks, session_mgr, bp):
|
||||
def __init__(self, env, tasks, chain_state):
|
||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||
# Initialise the Peer class
|
||||
Peer.DEFAULT_PORTS = env.coin.PEER_DEFAULT_PORTS
|
||||
self.env = env
|
||||
self.tasks = tasks
|
||||
self.session_mgr = session_mgr
|
||||
self.bp = bp
|
||||
self.chain_state = chain_state
|
||||
self.loop = tasks.loop
|
||||
|
||||
# Our clearnet and Tor Peers, if any
|
||||
|
||||
@ -18,7 +18,6 @@ import time
|
||||
from collections import defaultdict
|
||||
from functools import partial
|
||||
|
||||
import pylru
|
||||
from aiorpcx import ServerSession, JSONRPCAutoDetect, RPCError
|
||||
|
||||
import electrumx
|
||||
@ -98,10 +97,11 @@ class SessionManager(object):
|
||||
|
||||
CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4)
|
||||
|
||||
def __init__(self, env, tasks, controller):
|
||||
def __init__(self, env, tasks, chain_state, peer_mgr):
|
||||
self.env = env
|
||||
self.tasks = tasks
|
||||
self.controller = controller
|
||||
self.chain_state = chain_state
|
||||
self.peer_mgr = peer_mgr
|
||||
self.logger = util.class_logger(__name__, self.__class__.__name__)
|
||||
self.servers = {}
|
||||
self.sessions = set()
|
||||
@ -113,7 +113,6 @@ class SessionManager(object):
|
||||
self.state = self.CATCHING_UP
|
||||
self.txs_sent = 0
|
||||
self.start_time = time.time()
|
||||
self.history_cache = pylru.lrucache(256)
|
||||
# Cache some idea of room to avoid recounting on each subscription
|
||||
self.subs_room = 0
|
||||
# Masternode stuff only for such coins
|
||||
@ -122,6 +121,8 @@ class SessionManager(object):
|
||||
self.mn_cache = []
|
||||
# Event triggered when electrumx is listening for incoming requests.
|
||||
self.server_listening = asyncio.Event()
|
||||
# FIXME
|
||||
chain_state.mempool.notify_sessions = self.notify_sessions
|
||||
# Set up the RPC request handlers
|
||||
cmds = ('add_peer daemon_url disconnect getinfo groups log peers '
|
||||
'reorg sessions stop'.split())
|
||||
@ -133,7 +134,8 @@ class SessionManager(object):
|
||||
protocol_class = LocalRPC
|
||||
else:
|
||||
protocol_class = self.env.coin.SESSIONCLS
|
||||
protocol_factory = partial(protocol_class, self, self.controller, kind)
|
||||
protocol_factory = partial(protocol_class, self, self.chain_state,
|
||||
self.peer_mgr, kind)
|
||||
server = loop.create_server(protocol_factory, *args, **kw_args)
|
||||
|
||||
host, port = args[:2]
|
||||
@ -237,30 +239,26 @@ class SessionManager(object):
|
||||
for session in group_map[group]:
|
||||
session.group = new_group
|
||||
|
||||
def _getinfo(self):
|
||||
'''A one-line summary of server state.'''
|
||||
def _get_info(self):
|
||||
'''A summary of server state.'''
|
||||
group_map = self._group_map()
|
||||
daemon = self.controller.daemon
|
||||
bp = self.controller.bp
|
||||
peer_mgr = self.controller.peer_mgr
|
||||
return {
|
||||
result = self.chain_state.get_info()
|
||||
result.update({
|
||||
'version': electrumx.version,
|
||||
'daemon': daemon.logged_url(),
|
||||
'daemon_height': daemon.cached_height(),
|
||||
'db_height': bp.db_height,
|
||||
'closing': len([s for s in self.sessions if s.is_closing()]),
|
||||
'errors': sum(s.rpc.errors for s in self.sessions),
|
||||
'groups': len(group_map),
|
||||
'logged': len([s for s in self.sessions if s.log_me]),
|
||||
'paused': sum(s.paused for s in self.sessions),
|
||||
'pid': os.getpid(),
|
||||
'peers': peer_mgr.info(),
|
||||
'peers': self.peer_mgr.info(),
|
||||
'requests': sum(s.count_pending_items() for s in self.sessions),
|
||||
'sessions': self.session_count(),
|
||||
'subs': self._sub_count(),
|
||||
'txs_sent': self.txs_sent,
|
||||
'uptime': util.formatted_time(time.time() - self.start_time),
|
||||
}
|
||||
})
|
||||
return result
|
||||
|
||||
def _session_data(self, for_log):
|
||||
'''Returned to the RPC 'sessions' call.'''
|
||||
@ -305,7 +303,7 @@ class SessionManager(object):
|
||||
real_name: a real name, as would appear on IRC
|
||||
'''
|
||||
peer = Peer.from_real_name(real_name, 'RPC')
|
||||
self.controller.peer_mgr.add_peers([peer])
|
||||
self.peer_mgr.add_peers([peer])
|
||||
return "peer '{}' added".format(real_name)
|
||||
|
||||
def rpc_disconnect(self, session_ids):
|
||||
@ -330,22 +328,20 @@ class SessionManager(object):
|
||||
def rpc_daemon_url(self, daemon_url=None):
|
||||
'''Replace the daemon URL.'''
|
||||
daemon_url = daemon_url or self.env.daemon_url
|
||||
daemon = self.controller.daemon
|
||||
try:
|
||||
daemon.set_urls(self.env.coin.daemon_urls(daemon_url))
|
||||
daemon_url = self.chain_state.set_daemon_url(daemon_url)
|
||||
except Exception as e:
|
||||
raise RPCError(BAD_REQUEST, f'an error occured: {e}')
|
||||
return 'now using daemon at {}'.format(daemon.logged_url())
|
||||
return f'now using daemon at {daemon_url}'
|
||||
|
||||
def rpc_stop(self):
|
||||
'''Shut down the server cleanly.'''
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.call_soon(self.controller.shutdown_event.set)
|
||||
self.chain_state.shutdown()
|
||||
return 'stopping'
|
||||
|
||||
def rpc_getinfo(self):
|
||||
'''Return summary information about the server process.'''
|
||||
return self._getinfo()
|
||||
return self._get_info()
|
||||
|
||||
def rpc_groups(self):
|
||||
'''Return statistics about the session groups.'''
|
||||
@ -353,7 +349,7 @@ class SessionManager(object):
|
||||
|
||||
def rpc_peers(self):
|
||||
'''Return a list of data about server peers.'''
|
||||
return self.controller.peer_mgr.rpc_data()
|
||||
return self.peer_mgr.rpc_data()
|
||||
|
||||
def rpc_sessions(self):
|
||||
'''Return statistics about connected sessions.'''
|
||||
@ -365,7 +361,7 @@ class SessionManager(object):
|
||||
count: number of blocks to reorg (default 3)
|
||||
'''
|
||||
count = non_negative_integer(count)
|
||||
if not self.controller.bp.force_chain_reorg(count):
|
||||
if not self.chain_state.force_chain_reorg(count):
|
||||
raise RPCError(BAD_REQUEST, 'still catching up with daemon')
|
||||
return 'scheduled a reorg of {:,d} blocks'.format(count)
|
||||
|
||||
@ -409,14 +405,10 @@ class SessionManager(object):
|
||||
|
||||
def notify_sessions(self, touched):
|
||||
'''Notify sessions about height changes and touched addresses.'''
|
||||
height = self.controller.bp.db_height
|
||||
# Invalidate caches
|
||||
hc = self.history_cache
|
||||
for hashX in set(hc).intersection(touched):
|
||||
del hc[hashX]
|
||||
|
||||
self.chain_state.invalidate_history_cache(touched)
|
||||
# Height notifications are synchronous. Those sessions with
|
||||
# touched addresses are scheduled for asynchronous completion
|
||||
height = self.chain_state.db_height()
|
||||
for session in self.sessions:
|
||||
if isinstance(session, LocalRPC):
|
||||
continue
|
||||
@ -424,37 +416,6 @@ class SessionManager(object):
|
||||
if session_touched is not None:
|
||||
self.tasks.create_task(session.notify_async(session_touched))
|
||||
|
||||
def raw_header(self, height):
|
||||
'''Return the binary header at the given height.'''
|
||||
header, n = self.controller.bp.read_headers(height, 1)
|
||||
if n != 1:
|
||||
raise RPCError(BAD_REQUEST, f'height {height:,d} out of range')
|
||||
return header
|
||||
|
||||
async def get_history(self, hashX):
|
||||
'''Get history asynchronously to reduce latency.'''
|
||||
if hashX in self.history_cache:
|
||||
return self.history_cache[hashX]
|
||||
|
||||
def job():
|
||||
# History DoS limit. Each element of history is about 99
|
||||
# bytes when encoded as JSON. This limits resource usage
|
||||
# on bloated history requests, and uses a smaller divisor
|
||||
# so large requests are logged before refusing them.
|
||||
limit = self.env.max_send // 97
|
||||
return list(self.controller.bp.get_history(hashX, limit=limit))
|
||||
|
||||
history = await self.tasks.run_in_thread(job)
|
||||
self.history_cache[hashX] = history
|
||||
return history
|
||||
|
||||
async def get_utxos(self, hashX):
|
||||
'''Get UTXOs asynchronously to reduce latency.'''
|
||||
def job():
|
||||
return list(self.controller.bp.get_utxos(hashX, limit=None))
|
||||
|
||||
return await self.tasks.run_in_thread(job)
|
||||
|
||||
async def housekeeping(self):
|
||||
'''Regular housekeeping checks.'''
|
||||
n = 0
|
||||
@ -476,7 +437,7 @@ class SessionManager(object):
|
||||
data = self._session_data(for_log=True)
|
||||
for line in text.sessions_lines(data):
|
||||
self.logger.info(line)
|
||||
self.logger.info(json.dumps(self._getinfo()))
|
||||
self.logger.info(json.dumps(self._get_info()))
|
||||
self.next_log_sessions = time.time() + self.env.log_sessions
|
||||
|
||||
def add_session(self, session):
|
||||
@ -516,16 +477,15 @@ class SessionBase(ServerSession):
|
||||
MAX_CHUNK_SIZE = 2016
|
||||
session_counter = itertools.count()
|
||||
|
||||
def __init__(self, session_mgr, controller, kind):
|
||||
def __init__(self, session_mgr, chain_state, peer_mgr, kind):
|
||||
super().__init__(rpc_protocol=JSONRPCAutoDetect)
|
||||
self.logger = util.class_logger(__name__, self.__class__.__name__)
|
||||
self.session_mgr = session_mgr
|
||||
self.controller = controller
|
||||
self.chain_state = chain_state
|
||||
self.peer_mgr = peer_mgr
|
||||
self.kind = kind # 'RPC', 'TCP' etc.
|
||||
self.bp = controller.bp
|
||||
self.env = controller.env
|
||||
self.env = chain_state.env
|
||||
self.coin = self.env.coin
|
||||
self.daemon = self.bp.daemon
|
||||
self.client = 'unknown'
|
||||
self.anon_logs = self.env.anon_logs
|
||||
self.txs_sent = 0
|
||||
@ -615,6 +575,7 @@ class ElectrumX(SessionBase):
|
||||
self.sv_seen = False
|
||||
self.mempool_statuses = {}
|
||||
self.set_protocol_handlers(self.PROTOCOL_MIN)
|
||||
self.db_height = self.chain_state.db_height
|
||||
|
||||
@classmethod
|
||||
def protocol_min_max_strings(cls):
|
||||
@ -646,7 +607,7 @@ class ElectrumX(SessionBase):
|
||||
async def daemon_request(self, method, *args):
|
||||
'''Catch a DaemonError and convert it to an RPCError.'''
|
||||
try:
|
||||
return await getattr(self.controller.daemon, method)(*args)
|
||||
return await self.chain_state.daemon_request(method, args)
|
||||
except DaemonError as e:
|
||||
raise RPCError(DAEMON_ERROR, f'daemon error: {e}')
|
||||
|
||||
@ -706,25 +667,28 @@ class ElectrumX(SessionBase):
|
||||
|
||||
return None
|
||||
|
||||
def height(self):
|
||||
'''Return the current flushed database height.'''
|
||||
return self.bp.db_height
|
||||
|
||||
def assert_boolean(self, value):
|
||||
'''Return param value it is boolean otherwise raise an RPCError.'''
|
||||
if value in (False, True):
|
||||
return value
|
||||
raise RPCError(BAD_REQUEST, f'{value} should be a boolean value')
|
||||
|
||||
def raw_header(self, height):
|
||||
'''Return the binary header at the given height.'''
|
||||
try:
|
||||
return self.chain_state.raw_header(height)
|
||||
except IndexError:
|
||||
raise RPCError(BAD_REQUEST, f'height {height:,d} out of range')
|
||||
|
||||
def electrum_header(self, height):
|
||||
'''Return the deserialized header at the given height.'''
|
||||
raw_header = self.session_mgr.raw_header(height)
|
||||
raw_header = self.raw_header(height)
|
||||
return self.coin.electrum_header(raw_header, height)
|
||||
|
||||
def subscribe_headers_result(self, height):
|
||||
'''The result of a header subscription for the given height.'''
|
||||
if self.subscribe_headers_raw:
|
||||
raw_header = self.session_mgr.raw_header(height)
|
||||
raw_header = self.raw_header(height)
|
||||
return {'hex': raw_header.hex(), 'height': height}
|
||||
return self.electrum_header(height)
|
||||
|
||||
@ -732,8 +696,8 @@ class ElectrumX(SessionBase):
|
||||
'''Subscribe to get headers of new blocks.'''
|
||||
self.subscribe_headers = True
|
||||
self.subscribe_headers_raw = self.assert_boolean(raw)
|
||||
self.notified_height = self.height()
|
||||
return self.subscribe_headers_result(self.height())
|
||||
self.notified_height = self.db_height()
|
||||
return self.subscribe_headers_result(self.notified_height)
|
||||
|
||||
def headers_subscribe(self):
|
||||
'''Subscribe to get raw headers of new blocks.'''
|
||||
@ -749,12 +713,12 @@ class ElectrumX(SessionBase):
|
||||
|
||||
async def add_peer(self, features):
|
||||
'''Add a peer (but only if the peer resolves to the source).'''
|
||||
peer_mgr = self.controller.peer_mgr
|
||||
peer_mgr = self.peer_mgr
|
||||
return await peer_mgr.on_add_peer(features, self.peer_address())
|
||||
|
||||
def peers_subscribe(self):
|
||||
'''Return the server peers as a list of (ip, host, details) tuples.'''
|
||||
return self.controller.peer_mgr.on_peers_subscribe(self.is_tor())
|
||||
return self.peer_mgr.on_peers_subscribe(self.is_tor())
|
||||
|
||||
async def address_status(self, hashX):
|
||||
'''Returns an address status.
|
||||
@ -763,8 +727,8 @@ class ElectrumX(SessionBase):
|
||||
'''
|
||||
# Note history is ordered and mempool unordered in electrum-server
|
||||
# For mempool, height is -1 if unconfirmed txins, otherwise 0
|
||||
history = await self.session_mgr.get_history(hashX)
|
||||
mempool = await self.controller.mempool_transactions(hashX)
|
||||
history = await self.chain_state.get_history(hashX)
|
||||
mempool = await self.chain_state.mempool_transactions(hashX)
|
||||
|
||||
status = ''.join('{}:{:d}:'.format(hash_to_hex_str(tx_hash), height)
|
||||
for tx_hash, height in history)
|
||||
@ -785,10 +749,10 @@ class ElectrumX(SessionBase):
|
||||
async def hashX_listunspent(self, hashX):
|
||||
'''Return the list of UTXOs of a script hash, including mempool
|
||||
effects.'''
|
||||
utxos = await self.session_mgr.get_utxos(hashX)
|
||||
utxos = await self.chain_state.get_utxos(hashX)
|
||||
utxos = sorted(utxos)
|
||||
utxos.extend(self.controller.mempool.get_utxos(hashX))
|
||||
spends = await self.controller.mempool.potential_spends(hashX)
|
||||
utxos.extend(self.chain_state.mempool_get_utxos(hashX))
|
||||
spends = await self.chain_state.mempool_potential_spends(hashX)
|
||||
|
||||
return [{'tx_hash': hash_to_hex_str(utxo.tx_hash),
|
||||
'tx_pos': utxo.tx_pos,
|
||||
@ -802,7 +766,7 @@ class ElectrumX(SessionBase):
|
||||
raise RPCError(BAD_REQUEST, 'your address subscription limit '
|
||||
f'{self.max_subs:,d} reached')
|
||||
|
||||
# Now let the controller check its limit
|
||||
# Now let the session manager check its limit
|
||||
self.session_mgr.new_subscription()
|
||||
self.hashX_subs[hashX] = alias
|
||||
return await self.address_status(hashX)
|
||||
@ -842,9 +806,9 @@ class ElectrumX(SessionBase):
|
||||
return await self.hashX_subscribe(hashX, address)
|
||||
|
||||
async def get_balance(self, hashX):
|
||||
utxos = await self.session_mgr.get_utxos(hashX)
|
||||
utxos = await self.chain_state.get_utxos(hashX)
|
||||
confirmed = sum(utxo.value for utxo in utxos)
|
||||
unconfirmed = self.controller.mempool_value(hashX)
|
||||
unconfirmed = self.chain_state.mempool_value(hashX)
|
||||
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
|
||||
|
||||
async def scripthash_get_balance(self, scripthash):
|
||||
@ -855,13 +819,13 @@ class ElectrumX(SessionBase):
|
||||
async def unconfirmed_history(self, hashX):
|
||||
# Note unconfirmed history is unordered in electrum-server
|
||||
# Height is -1 if unconfirmed txins, otherwise 0
|
||||
mempool = await self.controller.mempool_transactions(hashX)
|
||||
mempool = await self.chain_state.mempool_transactions(hashX)
|
||||
return [{'tx_hash': tx_hash, 'height': -unconfirmed, 'fee': fee}
|
||||
for tx_hash, fee, unconfirmed in mempool]
|
||||
|
||||
async def confirmed_and_unconfirmed_history(self, hashX):
|
||||
# Note history is ordered but unconfirmed is unordered in e-s
|
||||
history = await self.session_mgr.get_history(hashX)
|
||||
history = await self.chain_state.get_history(hashX)
|
||||
conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height}
|
||||
for tx_hash, height in history]
|
||||
return conf + await self.unconfirmed_history(hashX)
|
||||
@ -889,13 +853,14 @@ class ElectrumX(SessionBase):
|
||||
return await self.hashX_subscribe(hashX, scripthash)
|
||||
|
||||
def _merkle_proof(self, cp_height, height):
|
||||
max_height = self.height()
|
||||
max_height = self.db_height()
|
||||
if not height <= cp_height <= max_height:
|
||||
raise RPCError(BAD_REQUEST,
|
||||
f'require header height {height:,d} <= '
|
||||
f'cp_height {cp_height:,d} <= '
|
||||
f'chain height {max_height:,d}')
|
||||
branch, root = self.bp.header_mc.branch_and_root(cp_height + 1, height)
|
||||
branch, root = self.chain_state.header_branch_and_root(
|
||||
cp_height + 1, height)
|
||||
return {
|
||||
'branch': [hash_to_hex_str(elt) for elt in branch],
|
||||
'root': hash_to_hex_str(root),
|
||||
@ -906,7 +871,7 @@ class ElectrumX(SessionBase):
|
||||
dictionary with a merkle proof.'''
|
||||
height = non_negative_integer(height)
|
||||
cp_height = non_negative_integer(cp_height)
|
||||
raw_header_hex = self.session_mgr.raw_header(height).hex()
|
||||
raw_header_hex = self.raw_header(height).hex()
|
||||
if cp_height == 0:
|
||||
return raw_header_hex
|
||||
result = {'header': raw_header_hex}
|
||||
@ -932,7 +897,7 @@ class ElectrumX(SessionBase):
|
||||
|
||||
max_size = self.MAX_CHUNK_SIZE
|
||||
count = min(count, max_size)
|
||||
headers, count = self.bp.read_headers(start_height, count)
|
||||
headers, count = self.chain_state.read_headers(start_height, count)
|
||||
result = {'hex': headers.hex(), 'count': count, 'max': max_size}
|
||||
if count and cp_height:
|
||||
last_height = start_height + count - 1
|
||||
@ -947,9 +912,9 @@ class ElectrumX(SessionBase):
|
||||
|
||||
index: the chunk index'''
|
||||
index = non_negative_integer(index)
|
||||
chunk_size = self.coin.CHUNK_SIZE
|
||||
start_height = index * chunk_size
|
||||
headers, count = self.bp.read_headers(start_height, chunk_size)
|
||||
size = self.coin.CHUNK_SIZE
|
||||
start_height = index * size
|
||||
headers, count = self.chain_state.read_headers(start_height, size)
|
||||
return headers.hex()
|
||||
|
||||
def block_get_header(self, height):
|
||||
@ -962,7 +927,7 @@ class ElectrumX(SessionBase):
|
||||
def is_tor(self):
|
||||
'''Try to detect if the connection is to a tor hidden service we are
|
||||
running.'''
|
||||
peername = self.controller.peer_mgr.proxy_peername()
|
||||
peername = self.peer_mgr.proxy_peername()
|
||||
if not peername:
|
||||
return False
|
||||
peer_address = self.peer_address()
|
||||
@ -1010,7 +975,7 @@ class ElectrumX(SessionBase):
|
||||
|
||||
def mempool_get_fee_histogram(self):
|
||||
'''Memory pool fee histogram.'''
|
||||
return self.controller.mempool.get_fee_histogram()
|
||||
return self.chain_state.mempool_fee_histogram()
|
||||
|
||||
async def relayfee(self):
|
||||
'''The minimum fee a low-priority tx must pay in order to be accepted
|
||||
@ -1073,7 +1038,7 @@ class ElectrumX(SessionBase):
|
||||
raw_tx: the raw transaction as a hexadecimal string'''
|
||||
# This returns errors as JSON RPC errors, as is natural
|
||||
try:
|
||||
tx_hash = await self.daemon.sendrawtransaction([raw_tx])
|
||||
tx_hash = await self.chain_state.broadcast_transaction(raw_tx)
|
||||
self.txs_sent += 1
|
||||
self.session_mgr.txs_sent += 1
|
||||
self.logger.info('sent tx: {}'.format(tx_hash))
|
||||
@ -1117,7 +1082,7 @@ class ElectrumX(SessionBase):
|
||||
tx_pos: index of transaction in tx_hashes to create branch for
|
||||
'''
|
||||
hashes = [hex_str_to_hash(hash) for hash in tx_hashes]
|
||||
branch, root = self.bp.merkle.branch_and_root(hashes, tx_pos)
|
||||
branch, root = self.chain_state.tx_branch_and_root(hashes, tx_pos)
|
||||
branch = [hash_to_hex_str(hash) for hash in branch]
|
||||
return branch
|
||||
|
||||
@ -1254,10 +1219,11 @@ class DashElectrumX(ElectrumX):
|
||||
})
|
||||
|
||||
async def notify_masternodes_async(self):
|
||||
for masternode in self.mns:
|
||||
status = await self.daemon.masternode_list(['status', masternode])
|
||||
for mn in self.mns:
|
||||
status = await self.daemon_request('masternode_list',
|
||||
['status', mn])
|
||||
self.send_notification('masternode.subscribe',
|
||||
[masternode, status.get(masternode)])
|
||||
[mn, status.get(mn)])
|
||||
|
||||
def notify(self, height, touched):
|
||||
'''Notify the client about changes in masternode list.'''
|
||||
@ -1274,7 +1240,8 @@ class DashElectrumX(ElectrumX):
|
||||
|
||||
signmnb: signed masternode broadcast message.'''
|
||||
try:
|
||||
return await self.daemon.masternode_broadcast(['relay', signmnb])
|
||||
return await self.daemon_request('masternode_broadcast',
|
||||
['relay', signmnb])
|
||||
except DaemonError as e:
|
||||
error, = e.args
|
||||
message = error['message']
|
||||
@ -1287,7 +1254,8 @@ class DashElectrumX(ElectrumX):
|
||||
|
||||
collateral: masternode collateral.
|
||||
'''
|
||||
result = await self.daemon.masternode_list(['status', collateral])
|
||||
result = await self.daemon_request('masternode_list',
|
||||
['status', collateral])
|
||||
if result is not None:
|
||||
self.mns.add(collateral)
|
||||
return result.get(collateral)
|
||||
@ -1350,8 +1318,9 @@ class DashElectrumX(ElectrumX):
|
||||
# with the masternode information including the payment
|
||||
# position is returned.
|
||||
cache = self.session_mgr.mn_cache
|
||||
if not cache or self.session_mgr.mn_cache_height != self.height():
|
||||
full_mn_list = await self.daemon.masternode_list(['full'])
|
||||
if not cache or self.session_mgr.mn_cache_height != self.db_height():
|
||||
full_mn_list = await self.daemon_request('masternode_list',
|
||||
['full'])
|
||||
mn_payment_queue = get_masternode_payment_queue(full_mn_list)
|
||||
mn_payment_count = len(mn_payment_queue)
|
||||
mn_list = []
|
||||
@ -1377,7 +1346,7 @@ class DashElectrumX(ElectrumX):
|
||||
mn_list.append(mn_info)
|
||||
cache.clear()
|
||||
cache.extend(mn_list)
|
||||
self.session_mgr.mn_cache_height = self.height()
|
||||
self.session_mgr.mn_cache_height = self.db_height()
|
||||
|
||||
# If payees is an empty list the whole masternode list is returned
|
||||
if payees:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user