Rework mempool and notification code

Clarifies the mempool interface to look more like what it
would in its own process
This commit is contained in:
Neil Booth 2018-07-21 08:39:56 +08:00
parent 3e8c413b77
commit c715ae6249
7 changed files with 146 additions and 120 deletions

View File

@ -14,6 +14,7 @@ Version 1.6.1 (in progress)
============================ ============================
* cleaner shutdown process with clear guarantees * cleaner shutdown process with clear guarantees
* cleaner mempool and notification handling
* aiohttp min version requirement raised to 2.0 * aiohttp min version requirement raised to 2.0
* onion peers are ignored if no tor proxy is available * onion peers are ignored if no tor proxy is available
* add Motion coin (ocruzv) * add Motion coin (ocruzv)

View File

@ -152,11 +152,12 @@ class BlockProcessor(electrumx.server.db.DB):
Coordinate backing up in case of chain reorganisations. Coordinate backing up in case of chain reorganisations.
''' '''
def __init__(self, env, tasks, daemon): def __init__(self, env, tasks, daemon, notifications):
super().__init__(env) super().__init__(env)
self.tasks = tasks self.tasks = tasks
self.daemon = daemon self.daemon = daemon
self.notifications = notifications
# Work queue # Work queue
self.queue = asyncio.Queue() self.queue = asyncio.Queue()
@ -168,7 +169,6 @@ class BlockProcessor(electrumx.server.db.DB):
self.next_cache_check = 0 self.next_cache_check = 0
self.last_flush = time.time() self.last_flush = time.time()
self.touched = set() self.touched = set()
self.callbacks = []
# Header merkle cache # Header merkle cache
self.merkle = Merkle() self.merkle = Merkle()
@ -226,9 +226,9 @@ class BlockProcessor(electrumx.server.db.DB):
self.logger.info('processed {:,d} block{} in {:.1f}s' self.logger.info('processed {:,d} block{} in {:.1f}s'
.format(len(blocks), s, .format(len(blocks), s,
time.time() - start)) time.time() - start))
for callback in self.callbacks: if self._caught_up_event.is_set():
callback(self.touched) await self.notifications.on_block(self.touched, self.height)
self.touched.clear() self.touched = set()
elif hprevs[0] != chain[0]: elif hprevs[0] != chain[0]:
await self.reorg_chain() await self.reorg_chain()
else: else:
@ -758,6 +758,8 @@ class BlockProcessor(electrumx.server.db.DB):
await self.check_and_advance_blocks(raw_blocks, first) await self.check_and_advance_blocks(raw_blocks, first)
elif work == PREFETCHER_CAUGHT_UP: elif work == PREFETCHER_CAUGHT_UP:
self._caught_up_event.set() self._caught_up_event.set()
# Initialise the notification framework
await self.notifications.on_block(set(), self.height)
elif work == REORG_CHAIN: elif work == REORG_CHAIN:
count, = args count, = args
await self.reorg_chain(count) await self.reorg_chain(count)

View File

@ -17,18 +17,17 @@ class ChainState(object):
blocks, transaction history, UTXOs and the mempool. blocks, transaction history, UTXOs and the mempool.
''' '''
def __init__(self, env, tasks): def __init__(self, env, tasks, notifications):
self._env = env self._env = env
self._tasks = tasks self._tasks = tasks
self._daemon = env.coin.DAEMON(env) self._daemon = env.coin.DAEMON(env)
BlockProcessor = env.coin.BLOCK_PROCESSOR BlockProcessor = env.coin.BLOCK_PROCESSOR
self._bp = BlockProcessor(env, tasks, self._daemon) self._bp = BlockProcessor(env, tasks, self._daemon, notifications)
self._mempool = MemPool(env.coin, self, tasks, self._mempool = MemPool(env.coin, self, tasks, notifications)
self._bp.add_new_block_callback)
self._history_cache = pylru.lrucache(256) self._history_cache = pylru.lrucache(256)
# External interface: pass-throughs for mempool.py # External interface: pass-throughs for mempool.py
self.cached_mempool_hashes = self._daemon.cached_mempool_hashes self.cached_height = self._daemon.cached_height
self.getrawtransactions = self._daemon.getrawtransactions self.getrawtransactions = self._daemon.getrawtransactions
self.utxo_lookup = self._bp.db_utxo_lookup self.utxo_lookup = self._bp.db_utxo_lookup
# External interface pass-throughs for session.py # External interface pass-throughs for session.py
@ -44,7 +43,7 @@ class ChainState(object):
async def broadcast_transaction(self, raw_tx): async def broadcast_transaction(self, raw_tx):
return await self._daemon.sendrawtransaction([raw_tx]) return await self._daemon.sendrawtransaction([raw_tx])
async def daemon_request(self, method, args): async def daemon_request(self, method, args=()):
return await getattr(self._daemon, method)(*args) return await getattr(self._daemon, method)(*args)
def db_height(self): def db_height(self):
@ -109,9 +108,4 @@ class ChainState(object):
async def wait_for_mempool(self): async def wait_for_mempool(self):
await self._bp.catch_up_to_daemon() await self._bp.catch_up_to_daemon()
# Tell the daemon to fetch the mempool going forwards, trigger await self._mempool.start_and_wait_for_sync()
# an initial fetch, and wait for the mempool to synchronize
mempool_refresh_event = asyncio.Event()
self._daemon._mempool_refresh_event = mempool_refresh_event
self._tasks.create_task(self._daemon.height())
await self._mempool.start_and_wait(mempool_refresh_event)

View File

@ -15,6 +15,58 @@ from electrumx.server.peers import PeerManager
from electrumx.server.session import SessionManager from electrumx.server.session import SessionManager
class Notifications(object):
# hashX notifications come from two sources: new blocks and
# mempool refreshes. The logic in daemon.py only gets new mempool
# hashes after getting the latest height.
#
# A user with a pending transaction is notified after the block it
# gets in is processed. Block processing can take an extended
# time, and any mempool refreshes during that time will not have
# the transaction in the mempool any more, which would cause a
# redundant notification. To avoid this, mempool touches are not
# notified whilst a block is being processed, but combined with
# the block notification when it is made. We do not pause mempool
# processing
def __init__(self):
self._touched_mp = {}
self._touched_bp = {}
self._highest_block = 0
async def _maybe_notify(self):
tmp, tbp = self._touched_mp, self._touched_bp
common = set(tmp).intersection(tbp)
if common:
height = max(common)
elif tmp and max(tmp) == self._highest_block:
height = self._highest_block
else:
# Either we are processing a block and waiting for it to
# come in, or we have had no mempool update for the
# current block
return
touched = tmp.pop(height)
touched.update(tbp.pop(height, set()))
for old in [h for h in tmp if h <= height]:
del tmp[old]
for old in [h for h in tbp if h <= height]:
del tbp[old]
await self.notify_sessions(touched, height)
async def on_mempool(self, touched, height):
self._touched_mp[height] = touched
await self._maybe_notify()
async def on_block(self, touched, height):
self._touched_bp[height] = touched
self._highest_block = height
await self._maybe_notify()
async def notify_sessions(self, touched, height):
pass
class Controller(ServerBase): class Controller(ServerBase):
'''Manages server initialisation and stutdown. '''Manages server initialisation and stutdown.
@ -39,10 +91,12 @@ class Controller(ServerBase):
self.logger.info(f'event loop policy: {env.loop_policy}') self.logger.info(f'event loop policy: {env.loop_policy}')
self.logger.info(f'reorg limit is {env.reorg_limit:,d} blocks') self.logger.info(f'reorg limit is {env.reorg_limit:,d} blocks')
self.chain_state = ChainState(env, self.tasks) notifications = Notifications()
self.chain_state = ChainState(env, self.tasks, notifications)
self.peer_mgr = PeerManager(env, self.tasks, self.chain_state) self.peer_mgr = PeerManager(env, self.tasks, self.chain_state)
self.session_mgr = SessionManager(env, self.tasks, self.chain_state, self.session_mgr = SessionManager(env, self.tasks, self.chain_state,
self.peer_mgr, self.shutdown_event) self.peer_mgr, notifications,
self.shutdown_event)
async def start_servers(self): async def start_servers(self):
'''Start the RPC server and wait for the mempool to synchronize. Then '''Start the RPC server and wait for the mempool to synchronize. Then

View File

@ -40,8 +40,6 @@ class Daemon(object):
self.coin = env.coin self.coin = env.coin
self.set_urls(env.coin.daemon_urls(env.daemon_url)) self.set_urls(env.coin.daemon_urls(env.daemon_url))
self._height = None self._height = None
self._mempool_hashes = set()
self._mempool_refresh_event = None
# Limit concurrent RPC calls to this number. # Limit concurrent RPC calls to this number.
# See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16
self.workqueue_semaphore = asyncio.Semaphore(value=10) self.workqueue_semaphore = asyncio.Semaphore(value=10)
@ -275,15 +273,8 @@ class Daemon(object):
async def height(self): async def height(self):
'''Query the daemon for its current height.''' '''Query the daemon for its current height.'''
self._height = await self._send_single('getblockcount') self._height = await self._send_single('getblockcount')
if self._mempool_refresh_event:
self._mempool_hashes = set(await self.mempool_hashes())
self._mempool_refresh_event.set()
return self._height return self._height
def cached_mempool_hashes(self):
'''Return the cached mempool hashes.'''
return self._mempool_hashes
def cached_height(self): def cached_height(self):
'''Return the cached daemon height. '''Return the cached daemon height.

View File

@ -14,7 +14,6 @@ from collections import defaultdict
from electrumx.lib.hash import hash_to_hex_str, hex_str_to_hash from electrumx.lib.hash import hash_to_hex_str, hex_str_to_hash
from electrumx.lib.util import class_logger from electrumx.lib.util import class_logger
from electrumx.server.daemon import DaemonError
from electrumx.server.db import UTXO, DB from electrumx.server.db import UTXO, DB
@ -32,31 +31,91 @@ class MemPool(object):
A pair is a (hashX, value) tuple. tx hashes are hex strings. A pair is a (hashX, value) tuple. tx hashes are hex strings.
''' '''
def __init__(self, coin, chain_state, tasks, add_new_block_callback): def __init__(self, coin, chain_state, tasks, notifications):
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
self.coin = coin self.coin = coin
self.chain_state = chain_state self.chain_state = chain_state
self.tasks = tasks self.tasks = tasks
self.touched = set() self.notifications = notifications
self.stop = False
self.txs = {} self.txs = {}
self.hashXs = defaultdict(set) # None can be a key self.hashXs = defaultdict(set) # None can be a key
self.fee_histogram = defaultdict(int) self.fee_histogram = defaultdict(int)
self.compact_fee_histogram = [] self.compact_fee_histogram = []
self.histogram_time = 0 self.histogram_time = 0
add_new_block_callback(self.on_new_block) self.next_log = 0
async def start_and_wait(self, mempool_refresh_event): async def start_and_wait_for_sync(self):
'''Creates the mempool synchronization task, and waits for it to '''Creates the mempool synchronization task, and waits for it to
first synchronize before returning.''' first synchronize before returning.'''
self.logger.info('beginning processing of daemon mempool. ' self.logger.info('beginning processing of daemon mempool. '
'This can take some time...') 'This can take some time...')
synchronized = asyncio.Event() await self._synchronize(True)
self.tasks.create_task(self._synchronize( self.tasks.create_task(self._synchronize_forever())
mempool_refresh_event, synchronized))
await synchronized.wait()
def _resync_daemon_hashes(self, unprocessed, unfetched): async def _synchronize_forever(self):
while True:
await asyncio.sleep(5)
await self._synchronize(False)
async def _refresh_hashes(self):
'''Return daemon hashes when we're sure which height they are
good for.'''
height = self.chain_state.cached_height()
daemon_request = self.chain_state.daemon_request
while True:
hashes = await daemon_request('mempool_hashes')
later_height = await daemon_request('height')
if height == later_height:
return set(hashes), height
height = later_height
async def _synchronize(self, first_time):
'''Asynchronously maintain mempool status with daemon.
Processes the mempool each time the mempool refresh event is
signalled.
'''
unprocessed = {}
unfetched = set()
touched = set()
txs = self.txs
next_refresh = 0
fetch_size = 800
process_some = self._async_process_some(fetch_size // 2)
while True:
now = time.time()
# If processing a large mempool, a block being found might
# shrink our work considerably, so refresh our view every 20s
if now > next_refresh:
hashes, height = await self._refresh_hashes()
self._resync_hashes(hashes, unprocessed, unfetched, touched)
next_refresh = time.time() + 20
# Log progress of initial sync
todo = len(unfetched) + len(unprocessed)
if first_time:
pct = (len(txs) - todo) * 100 // len(txs) if txs else 0
self.logger.info(f'catchup {pct:d}% complete '
f'({todo:,d} txs left)')
if not todo:
break
# FIXME: parallelize
if unfetched:
count = min(len(unfetched), fetch_size)
hex_hashes = [unfetched.pop() for n in range(count)]
unprocessed.update(await self.fetch_raw_txs(hex_hashes))
if unprocessed:
await process_some(unprocessed, touched)
if now >= self.next_log:
self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(txs), len(self.hashXs)))
self.next_log = time.time() + 150
await self.notifications.on_mempool(touched, height)
def _resync_hashes(self, hashes, unprocessed, unfetched, touched):
'''Re-sync self.txs with the list of hashes in the daemon's mempool. '''Re-sync self.txs with the list of hashes in the daemon's mempool.
Additionally, remove gone hashes from unprocessed and Additionally, remove gone hashes from unprocessed and
@ -64,10 +123,7 @@ class MemPool(object):
''' '''
txs = self.txs txs = self.txs
hashXs = self.hashXs hashXs = self.hashXs
touched = self.touched
fee_hist = self.fee_histogram fee_hist = self.fee_histogram
hashes = self.chain_state.cached_mempool_hashes()
gone = set(txs).difference(hashes) gone = set(txs).difference(hashes)
for hex_hash in gone: for hex_hash in gone:
unfetched.discard(hex_hash) unfetched.discard(hex_hash)
@ -92,69 +148,12 @@ class MemPool(object):
for hex_hash in new: for hex_hash in new:
txs[hex_hash] = None txs[hex_hash] = None
async def _synchronize(self, mempool_refresh_event, synchronized):
'''Asynchronously maintain mempool status with daemon.
Processes the mempool each time the mempool refresh event is
signalled.
'''
unprocessed = {}
unfetched = set()
txs = self.txs
fetch_size = 800
process_some = self._async_process_some(fetch_size // 2)
next_log = 0
loops = -1 # Zero during initial catchup
while True:
# Avoid double notifications if processing a block
if self.touched and not self.chain_state.processing_new_block():
self.notify_sessions(self.touched)
self.touched.clear()
# Log progress / state
todo = len(unfetched) + len(unprocessed)
if loops == 0:
pct = (len(txs) - todo) * 100 // len(txs) if txs else 0
self.logger.info('catchup {:d}% complete '
'({:,d} txs left)'.format(pct, todo))
if not todo:
loops += 1
if loops > 0:
synchronized.set()
now = time.time()
if now >= next_log and loops:
self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(txs), len(self.hashXs)))
next_log = now + 150
try:
if not todo:
await mempool_refresh_event.wait()
self._resync_daemon_hashes(unprocessed, unfetched)
mempool_refresh_event.clear()
if unfetched:
count = min(len(unfetched), fetch_size)
hex_hashes = [unfetched.pop() for n in range(count)]
unprocessed.update(await self.fetch_raw_txs(hex_hashes))
if unprocessed:
await process_some(unprocessed)
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
except asyncio.CancelledError:
# This aids clean shutdowns
self.stop = True
break
def _async_process_some(self, limit): def _async_process_some(self, limit):
pending = [] pending = []
txs = self.txs txs = self.txs
fee_hist = self.fee_histogram fee_hist = self.fee_histogram
async def process(unprocessed): async def process(unprocessed, touched):
nonlocal pending nonlocal pending
raw_txs = {} raw_txs = {}
@ -174,7 +173,6 @@ class MemPool(object):
pending.extend(deferred) pending.extend(deferred)
hashXs = self.hashXs hashXs = self.hashXs
touched = self.touched
for hex_hash, item in result.items(): for hex_hash, item in result.items():
if hex_hash in txs: if hex_hash in txs:
txs[hex_hash] = item txs[hex_hash] = item
@ -188,17 +186,6 @@ class MemPool(object):
return process return process
def on_new_block(self, touched):
'''Called after processing one or more new blocks.
Touched is a set of hashXs touched by the transactions in the
block. Caller must be aware it is modified by this function.
'''
# Minor race condition here with mempool processor thread
touched.update(self.touched)
self.touched.clear()
self.notify_sessions(touched)
async def fetch_raw_txs(self, hex_hashes): async def fetch_raw_txs(self, hex_hashes):
'''Fetch a list of mempool transactions.''' '''Fetch a list of mempool transactions.'''
raw_txs = await self.chain_state.getrawtransactions(hex_hashes) raw_txs = await self.chain_state.getrawtransactions(hex_hashes)
@ -241,9 +228,6 @@ class MemPool(object):
utxo_lookup = self.chain_state.utxo_lookup utxo_lookup = self.chain_state.utxo_lookup
for item in pending: for item in pending:
if self.stop:
break
tx_hash, old_txin_pairs, txout_pairs, tx_size = item tx_hash, old_txin_pairs, txout_pairs, tx_size = item
if tx_hash not in txs: if tx_hash not in txs:
continue continue

View File

@ -97,12 +97,14 @@ class SessionManager(object):
CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4)
def __init__(self, env, tasks, chain_state, peer_mgr, shutdown_event): def __init__(self, env, tasks, chain_state, peer_mgr, notifications,
shutdown_event):
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.env = env self.env = env
self.tasks = tasks self.tasks = tasks
self.chain_state = chain_state self.chain_state = chain_state
self.peer_mgr = peer_mgr self.peer_mgr = peer_mgr
self.notifications = notifications
self.shutdown_event = shutdown_event self.shutdown_event = shutdown_event
self.logger = util.class_logger(__name__, self.__class__.__name__) self.logger = util.class_logger(__name__, self.__class__.__name__)
self.servers = {} self.servers = {}
@ -123,8 +125,7 @@ class SessionManager(object):
self.mn_cache = [] self.mn_cache = []
# Event triggered when electrumx is listening for incoming requests. # Event triggered when electrumx is listening for incoming requests.
self.server_listening = asyncio.Event() self.server_listening = asyncio.Event()
# FIXME notifications.notify_sessions = self.notify_sessions
chain_state._mempool.notify_sessions = self.notify_sessions
# Set up the RPC request handlers # Set up the RPC request handlers
cmds = ('add_peer daemon_url disconnect getinfo groups log peers ' cmds = ('add_peer daemon_url disconnect getinfo groups log peers '
'reorg sessions stop'.split()) 'reorg sessions stop'.split())
@ -431,12 +432,11 @@ class SessionManager(object):
'''The number of connections that we've sent something to.''' '''The number of connections that we've sent something to.'''
return len(self.sessions) return len(self.sessions)
def notify_sessions(self, touched): async def notify_sessions(self, touched, height):
'''Notify sessions about height changes and touched addresses.''' '''Notify sessions about height changes and touched addresses.'''
self.chain_state.invalidate_history_cache(touched) self.chain_state.invalidate_history_cache(touched)
# Height notifications are synchronous. Those sessions with # Height notifications are synchronous. Those sessions with
# touched addresses are scheduled for asynchronous completion # touched addresses are scheduled for asynchronous completion
height = self.chain_state.db_height()
for session in self.sessions: for session in self.sessions:
if isinstance(session, LocalRPC): if isinstance(session, LocalRPC):
continue continue