Rework mempool and BP<->prefretcher communication
This commit is contained in:
parent
431989f0ea
commit
ca3ba2b2d8
@ -23,12 +23,17 @@ from electrumx.lib.util import chunks, formatted_time, class_logger
|
||||
import electrumx.server.db
|
||||
|
||||
|
||||
RAW_BLOCKS, PREFETCHER_CAUGHT_UP, REORG_CHAIN = range(3)
|
||||
|
||||
|
||||
class Prefetcher(object):
|
||||
'''Prefetches blocks (in the forward direction only).'''
|
||||
|
||||
def __init__(self, bp):
|
||||
def __init__(self, daemon, coin, queue):
|
||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||
self.bp = bp
|
||||
self.daemon = daemon
|
||||
self.coin = coin
|
||||
self.queue = queue
|
||||
self.caught_up = False
|
||||
# Access to fetched_height should be protected by the semaphore
|
||||
self.fetched_height = None
|
||||
@ -58,18 +63,19 @@ class Prefetcher(object):
|
||||
if self.cache_size < self.min_cache_size:
|
||||
self.refill_event.set()
|
||||
|
||||
async def reset_height(self):
|
||||
async def reset_height(self, height):
|
||||
'''Reset to prefetch blocks from the block processor's height.
|
||||
|
||||
Used in blockchain reorganisations. This coroutine can be
|
||||
called asynchronously to the _prefetch coroutine so we must
|
||||
synchronize with a semaphore.'''
|
||||
called asynchronously to the _prefetch_blocks coroutine so we
|
||||
must synchronize with a semaphore.
|
||||
'''
|
||||
async with self.semaphore:
|
||||
self.fetched_height = self.bp.height
|
||||
self.fetched_height = height
|
||||
self.refill_event.set()
|
||||
|
||||
daemon_height = await self.bp.daemon.height()
|
||||
behind = daemon_height - self.bp.height
|
||||
daemon_height = await self.daemon.height()
|
||||
behind = daemon_height - height
|
||||
if behind > 0:
|
||||
self.logger.info('catching up to daemon height {:,d} '
|
||||
'({:,d} blocks behind)'
|
||||
@ -83,8 +89,8 @@ class Prefetcher(object):
|
||||
|
||||
Repeats until the queue is full or caught up.
|
||||
'''
|
||||
daemon = self.bp.daemon
|
||||
daemon_height = await daemon.height(self.bp._caught_up_event.is_set())
|
||||
daemon = self.daemon
|
||||
daemon_height = await daemon.height()
|
||||
async with self.semaphore:
|
||||
while self.cache_size < self.min_cache_size:
|
||||
# Try and catch up all blocks but limit to room in cache.
|
||||
@ -96,7 +102,7 @@ class Prefetcher(object):
|
||||
if not count:
|
||||
if not self.caught_up:
|
||||
self.caught_up = True
|
||||
self.bp.on_prefetcher_first_caught_up()
|
||||
await self.queue.put((PREFETCHER_CAUGHT_UP, ))
|
||||
return False
|
||||
|
||||
first = self.fetched_height + 1
|
||||
@ -110,7 +116,7 @@ class Prefetcher(object):
|
||||
|
||||
# Special handling for genesis block
|
||||
if first == 0:
|
||||
blocks[0] = self.bp.coin.genesis_block(blocks[0])
|
||||
blocks[0] = self.coin.genesis_block(blocks[0])
|
||||
self.logger.info('verified genesis block with hash {}'
|
||||
.format(hex_hashes[0]))
|
||||
|
||||
@ -121,7 +127,7 @@ class Prefetcher(object):
|
||||
else:
|
||||
self.ave_size = (size + (10 - count) * self.ave_size) // 10
|
||||
|
||||
self.bp.on_prefetched_blocks(blocks, first)
|
||||
await self.queue.put((RAW_BLOCKS, blocks, first))
|
||||
self.cache_size += size
|
||||
self.fetched_height += count
|
||||
|
||||
@ -152,9 +158,10 @@ class BlockProcessor(electrumx.server.db.DB):
|
||||
self.tasks = tasks
|
||||
self.daemon = daemon
|
||||
|
||||
# Work queue
|
||||
self.queue = asyncio.Queue()
|
||||
self._caught_up_event = asyncio.Event()
|
||||
self.task_queue = asyncio.Queue()
|
||||
self.prefetcher = Prefetcher(self)
|
||||
self.prefetcher = Prefetcher(daemon, env.coin, self.queue)
|
||||
|
||||
# Meta
|
||||
self.cache_MB = env.cache_MB
|
||||
@ -185,17 +192,6 @@ class BlockProcessor(electrumx.server.db.DB):
|
||||
'''Add the task to our task queue.'''
|
||||
self.task_queue.put_nowait(task)
|
||||
|
||||
def on_prefetched_blocks(self, blocks, first):
|
||||
'''Called by the prefetcher when it has prefetched some blocks.'''
|
||||
self.add_task(partial(self.check_and_advance_blocks, blocks, first))
|
||||
|
||||
def on_prefetcher_first_caught_up(self):
|
||||
'''Called by the prefetcher when it first catches up.'''
|
||||
# Process after prior tasks (blocks) are completed.
|
||||
async def set_event():
|
||||
self._caught_up_event.set()
|
||||
self.add_task(set_event)
|
||||
|
||||
def add_new_block_callback(self, callback):
|
||||
'''Add a function called when a new block is found.
|
||||
|
||||
@ -247,7 +243,7 @@ class BlockProcessor(electrumx.server.db.DB):
|
||||
# just to reset the prefetcher and try again.
|
||||
self.logger.warning('daemon blocks do not form a chain; '
|
||||
'resetting the prefetcher')
|
||||
await self.prefetcher.reset_height()
|
||||
await self.prefetcher.reset_height(self.height)
|
||||
|
||||
async def reorg_chain(self, count=None):
|
||||
'''Handle a chain reorganisation.
|
||||
@ -280,7 +276,7 @@ class BlockProcessor(electrumx.server.db.DB):
|
||||
last -= len(raw_blocks)
|
||||
# Truncate header_mc: header count is 1 more than the height
|
||||
self.header_mc.truncate(self.height + 1)
|
||||
await self.prefetcher.reset_height()
|
||||
await self.prefetcher.reset_height(self.height)
|
||||
|
||||
async def reorg_hashes(self, count):
|
||||
'''Return a pair (start, hashes) of blocks to back up during a
|
||||
@ -760,8 +756,15 @@ class BlockProcessor(electrumx.server.db.DB):
|
||||
async def _process_queue(self):
|
||||
'''Loop forever processing enqueued work.'''
|
||||
while True:
|
||||
task = await self.task_queue.get()
|
||||
await task()
|
||||
work, *args = await self.queue.get()
|
||||
if work == RAW_BLOCKS:
|
||||
raw_blocks, first = args
|
||||
await self.check_and_advance_blocks(raw_blocks, first)
|
||||
elif work == PREFETCHER_CAUGHT_UP:
|
||||
self._caught_up_event.set()
|
||||
elif work == REORG_CHAIN:
|
||||
count, = args
|
||||
await self.reorg_chain(count)
|
||||
|
||||
def _on_dbs_opened(self):
|
||||
# An incomplete compaction needs to be cancelled otherwise
|
||||
@ -790,7 +793,7 @@ class BlockProcessor(electrumx.server.db.DB):
|
||||
self._on_dbs_opened()
|
||||
# Get the prefetcher running
|
||||
self.tasks.create_task(self.prefetcher.main_loop())
|
||||
await self.prefetcher.reset_height()
|
||||
await self.prefetcher.reset_height(self.height)
|
||||
# Start our loop that processes blocks as they are fetched
|
||||
self.worker_task = self.tasks.create_task(self._process_queue())
|
||||
# Wait until caught up
|
||||
@ -816,7 +819,7 @@ class BlockProcessor(electrumx.server.db.DB):
|
||||
Returns True if a reorg is queued, false if not caught up.
|
||||
'''
|
||||
if self._caught_up_event.is_set():
|
||||
self.add_task(partial(self.reorg_chain, count=count))
|
||||
self.queue.put_nowait((REORG_CHAIN, count))
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@ -6,6 +6,7 @@
|
||||
# and warranty status of this software.
|
||||
|
||||
|
||||
import asyncio
|
||||
import pylru
|
||||
|
||||
from electrumx.server.mempool import MemPool
|
||||
@ -108,5 +109,9 @@ class ChainState(object):
|
||||
|
||||
async def wait_for_mempool(self):
|
||||
await self.bp.catch_up_to_daemon()
|
||||
self.tasks.create_task(self.mempool.main_loop())
|
||||
await self.mempool.synchronized_event.wait()
|
||||
# Tell the daemon to fetch the mempool going forwards, trigger
|
||||
# an initial fetch, and wait for the mempool to synchronize
|
||||
mempool_refresh_event = asyncio.Event()
|
||||
daemon._mempool_refresh_event = mempool_refresh_event
|
||||
self.tasks.create_task(self.daemon.height())
|
||||
await self.mempool.start_and_wait(mempool_refresh_event)
|
||||
|
||||
@ -41,7 +41,7 @@ class Daemon(object):
|
||||
self.set_urls(env.coin.daemon_urls(env.daemon_url))
|
||||
self._height = None
|
||||
self._mempool_hashes = set()
|
||||
self.mempool_refresh_event = asyncio.Event()
|
||||
self._mempool_refresh_event = None
|
||||
# Limit concurrent RPC calls to this number.
|
||||
# See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16
|
||||
self.workqueue_semaphore = asyncio.Semaphore(value=10)
|
||||
@ -281,12 +281,12 @@ class Daemon(object):
|
||||
'''Broadcast a transaction to the network.'''
|
||||
return await self._send_single('sendrawtransaction', params)
|
||||
|
||||
async def height(self, mempool=False):
|
||||
async def height(self):
|
||||
'''Query the daemon for its current height.'''
|
||||
self._height = await self._send_single('getblockcount')
|
||||
if mempool:
|
||||
if self._mempool_refresh_event:
|
||||
self._mempool_hashes = set(await self.mempool_hashes())
|
||||
self.mempool_refresh_event.set()
|
||||
self._mempool_refresh_event.set()
|
||||
return self._height
|
||||
|
||||
def cached_mempool_hashes(self):
|
||||
|
||||
@ -41,12 +41,21 @@ class MemPool(object):
|
||||
self.stop = False
|
||||
self.txs = {}
|
||||
self.hashXs = defaultdict(set) # None can be a key
|
||||
self.synchronized_event = asyncio.Event()
|
||||
self.fee_histogram = defaultdict(int)
|
||||
self.compact_fee_histogram = []
|
||||
self.histogram_time = 0
|
||||
add_new_block_callback(self.on_new_block)
|
||||
|
||||
async def start_and_wait(self, mempool_refresh_event):
|
||||
'''Creates the mempool synchronization task, and waits for it to
|
||||
first synchronize before returning.'''
|
||||
self.logger.info('beginning processing of daemon mempool. '
|
||||
'This can take some time...')
|
||||
synchronized = asyncio.Event()
|
||||
self.tasks.create_task(self._synchronize(
|
||||
mempool_refresh_event, synchronized))
|
||||
await synchronized.wait()
|
||||
|
||||
def _resync_daemon_hashes(self, unprocessed, unfetched):
|
||||
'''Re-sync self.txs with the list of hashes in the daemon's mempool.
|
||||
|
||||
@ -83,21 +92,17 @@ class MemPool(object):
|
||||
for hex_hash in new:
|
||||
txs[hex_hash] = None
|
||||
|
||||
async def main_loop(self):
|
||||
async def _synchronize(self, mempool_refresh_event, synchronized):
|
||||
'''Asynchronously maintain mempool status with daemon.
|
||||
|
||||
Processes the mempool each time the daemon's mempool refresh
|
||||
event is signalled.
|
||||
Processes the mempool each time the mempool refresh event is
|
||||
signalled.
|
||||
'''
|
||||
unprocessed = {}
|
||||
unfetched = set()
|
||||
txs = self.txs
|
||||
fetch_size = 800
|
||||
process_some = self._async_process_some(fetch_size // 2)
|
||||
|
||||
self.logger.info('beginning processing of daemon mempool. '
|
||||
'This can take some time...')
|
||||
await self.chain_state.mempool_refresh_event.wait()
|
||||
next_log = 0
|
||||
loops = -1 # Zero during initial catchup
|
||||
|
||||
@ -116,7 +121,7 @@ class MemPool(object):
|
||||
if not todo:
|
||||
loops += 1
|
||||
if loops > 0:
|
||||
self.synchronized_event.set()
|
||||
synchronized.set()
|
||||
now = time.time()
|
||||
if now >= next_log and loops:
|
||||
self.logger.info('{:,d} txs touching {:,d} addresses'
|
||||
@ -125,10 +130,10 @@ class MemPool(object):
|
||||
|
||||
try:
|
||||
if not todo:
|
||||
await self.chain_state.mempool_refresh_event.wait()
|
||||
await mempool_refresh_event.wait()
|
||||
|
||||
self._resync_daemon_hashes(unprocessed, unfetched)
|
||||
self.chain_state.mempool_refresh_event.clear()
|
||||
mempool_refresh_event.clear()
|
||||
|
||||
if unfetched:
|
||||
count = min(len(unfetched), fetch_size)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user