diff --git a/RELEASE-NOTES b/RELEASE-NOTES index 1cff1f0..859abc7 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,9 @@ +version 0.9.8 +------------- + +- cleanup up mempool handling, notify of addresses only once when a new block + comes in. Fixes issue 70. + version 0.9.7 ------------- diff --git a/server/block_processor.py b/server/block_processor.py index a6b6cab..7d364a4 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -85,9 +85,13 @@ class Prefetcher(LoggedClass): async def _prefetch(self): '''Prefetch blocks unless the prefetch queue is full.''' + # Refresh the mempool after updating the daemon height, if and + # only if we've caught up daemon_height = await self.daemon.height() - cache_room = self.target_cache_size // self.ave_size + if self.caught_up: + await self.daemon.refresh_mempool_hashes() + cache_room = self.target_cache_size // self.ave_size with await self.semaphore: # Try and catch up all blocks but limit to room in cache. # Constrain count to between 0 and 4000 regardless @@ -132,7 +136,7 @@ class BlockProcessor(server.db.DB): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, touched, touched_event): + def __init__(self, env): super().__init__(env) # The block processor reads its tasks from this queue @@ -149,8 +153,6 @@ class BlockProcessor(server.db.DB): self.caught_up = False self._shutdown = False self.event = asyncio.Event() - self.touched = touched - self.touched_event = touched_event # Meta self.utxo_MB = env.utxo_MB @@ -180,7 +182,7 @@ class BlockProcessor(server.db.DB): self.logger.info('flushing history cache at {:,d} MB' .format(self.hist_MB)) - async def main_loop(self): + async def main_loop(self, touched): '''Main loop for block processing.''' # Simulate a reorg if requested @@ -195,7 +197,7 @@ class BlockProcessor(server.db.DB): break blocks = self.prefetcher.get_blocks() if blocks: - await self.advance_blocks(blocks) + await self.advance_blocks(blocks, touched) elif not self.caught_up: self.caught_up = True self.first_caught_up() @@ -209,7 +211,7 @@ class BlockProcessor(server.db.DB): self._shutdown = True self.tasks.put_nowait(None) - async def advance_blocks(self, blocks): + async def advance_blocks(self, blocks, touched): '''Strip the unspendable genesis coinbase.''' if self.height == -1: blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) @@ -218,7 +220,7 @@ class BlockProcessor(server.db.DB): for block in blocks: if self._shutdown: break - self.advance_block(block, self.touched) + self.advance_block(block, touched) loop = asyncio.get_event_loop() try: @@ -227,14 +229,13 @@ class BlockProcessor(server.db.DB): else: do_it() except ChainReorg: - await self.handle_chain_reorg(self.touched) + await self.handle_chain_reorg(touched) if self.caught_up: # Flush everything as queries are performed on the DB and # not in-memory. await asyncio.sleep(0) self.flush(True) - self.touched_event.set() elif time.time() > self.next_cache_check: self.check_cache_size() self.next_cache_check = time.time() + 60 diff --git a/server/daemon.py b/server/daemon.py index cee7717..42897b3 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -36,6 +36,8 @@ class Daemon(util.LoggedClass): self.urls = urls self.url_index = 0 self._height = None + self.mempool_hashes = set() + self.mempool_refresh_event = asyncio.Event() # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 self.workqueue_semaphore = asyncio.Semaphore(value=10) @@ -148,9 +150,10 @@ class Daemon(util.LoggedClass): # Convert hex string to bytes return [bytes.fromhex(block) for block in blocks] - async def mempool_hashes(self): - '''Return the hashes of the txs in the daemon's mempool.''' - return await self._send_single('getrawmempool') + async def refresh_mempool_hashes(self): + '''Update our record of the daemon's mempool hashes.''' + self.mempool_hashes = set(await self._send_single('getrawmempool')) + self.mempool_refresh_event.set() async def estimatefee(self, params): '''Return the fee estimate for the given parameters.''' diff --git a/server/mempool.py b/server/mempool.py index 35cf702..0984154 100644 --- a/server/mempool.py +++ b/server/mempool.py @@ -33,92 +33,31 @@ class MemPool(util.LoggedClass): A pair is a (hash168, value) tuple. tx hashes are hex strings. ''' - def __init__(self, daemon, coin, db, touched, touched_event): + def __init__(self, daemon, coin, db): super().__init__() self.daemon = daemon self.coin = coin self.db = db - self.touched = touched - self.touched_event = touched_event + self.touched = set() + self.touched_event = asyncio.Event() self.stop = False self.txs = {} self.hash168s = defaultdict(set) # None can be a key - async def main_loop(self, caught_up): - '''Asynchronously maintain mempool status with daemon. + def resync_daemon_hashes(self, unprocessed, unfetched): + '''Re-sync self.txs with the list of hashes in the daemon's mempool. - Waits until the caught up event is signalled.''' - await caught_up.wait() - self.logger.info('beginning processing of daemon mempool. ' - 'This can take some time...') - try: - await self.fetch_and_process() - except asyncio.CancelledError: - # This aids clean shutdowns - self.stop = True - - async def fetch_and_process(self): - '''The inner loop unprotected by try / except.''' - unfetched = set() - unprocessed = {} - log_every = 150 - log_secs = 0 - fetch_size = 400 - process_some = self.async_process_some(unfetched, fetch_size // 2) - next_refresh = 0 - # The list of mempool hashes is fetched no more frequently - # than this number of seconds - refresh_secs = 5 - - while True: - try: - now = time.time() - if now >= next_refresh: - await self.new_hashes(unprocessed, unfetched) - next_refresh = now + refresh_secs - log_secs -= refresh_secs - - # Fetch some txs if unfetched ones remain - if unfetched: - count = min(len(unfetched), fetch_size) - hex_hashes = [unfetched.pop() for n in range(count)] - unprocessed.update(await self.fetch_raw_txs(hex_hashes)) - - # Process some txs if unprocessed ones remain - if unprocessed: - await process_some(unprocessed) - - # Avoid double notifications if processing a block - if self.touched and not self.processing_new_block(): - self.touched_event.set() - - if not unprocessed: - if log_secs <= 0: - log_secs = log_every - self.logger.info('{:,d} txs touching {:,d} addresses' - .format(len(self.txs), - len(self.hash168s))) - await asyncio.sleep(1) - except DaemonError as e: - self.logger.info('ignoring daemon error: {}'.format(e)) - - def processing_new_block(self): - '''Return True if we're processing a new block.''' - return self.daemon.cached_height() > self.db.db_height - - async def new_hashes(self, unprocessed, unfetched): - '''Get the current list of hashes in the daemon's mempool. - - Remove ones that have disappeared from self.txs and unprocessed. + Additionally, remove gone hashes from unprocessed and + unfetched. Add new ones to unprocessed. ''' txs = self.txs hash168s = self.hash168s touched = self.touched - hashes = set(await self.daemon.mempool_hashes()) - new = hashes.difference(txs) + hashes = self.daemon.mempool_hashes gone = set(txs).difference(hashes) for hex_hash in gone: + unfetched.discard(hex_hash) unprocessed.pop(hex_hash, None) item = txs.pop(hex_hash) if item: @@ -131,18 +70,71 @@ class MemPool(util.LoggedClass): del hash168s[hash168] touched.update(tx_hash168s) + new = hashes.difference(txs) unfetched.update(new) for hex_hash in new: txs[hex_hash] = None + async def main_loop(self): + '''Asynchronously maintain mempool status with daemon. + + Processes the mempool each time the daemon's mempool refresh + event is signalled. + ''' + unprocessed = {} + unfetched = set() + txs = self.txs + fetch_size = 400 + process_some = self.async_process_some(unfetched, fetch_size // 2) + + await self.daemon.mempool_refresh_event.wait() + self.logger.info ('beginning processing of daemon mempool. ' + 'This can take some time...') + next_log = time.time() + 0.1 + + while True: + try: + todo = len(unfetched) + len(unprocessed) + if todo: + pct = (len(txs) - todo) * 100 // len(txs) if txs else 0 + self.logger.info('catchup {:d}% complete ({:,d} txs left)' + .format(pct, todo)) + else: + now = time.time() + if now >= next_log: + self.logger.info('{:,d} txs touching {:,d} addresses' + .format(len(txs), len(self.hash168s))) + next_log = now + 150 + await self.daemon.mempool_refresh_event.wait() + + self.resync_daemon_hashes(unprocessed, unfetched) + self.daemon.mempool_refresh_event.clear() + + if unfetched: + count = min(len(unfetched), fetch_size) + hex_hashes = [unfetched.pop() for n in range(count)] + unprocessed.update(await self.fetch_raw_txs(hex_hashes)) + + if unprocessed: + await process_some(unprocessed) + + # Avoid double notifications if processing a block + if self.touched and not self.processing_new_block(): + self.touched_event.set() + except DaemonError as e: + self.logger.info('ignoring daemon error: {}'.format(e)) + except asyncio.CancelledError: + # This aids clean shutdowns + self.stop = True + break + def async_process_some(self, unfetched, limit): loop = asyncio.get_event_loop() pending = [] txs = self.txs - first = True async def process(unprocessed): - nonlocal first, pending + nonlocal pending raw_txs = {} while unprocessed and len(raw_txs) < limit: @@ -169,16 +161,12 @@ class MemPool(util.LoggedClass): touched.add(hash168) hash168s[hash168].add(hex_hash) - to_do = len(unfetched) + len(unprocessed) - if to_do and txs: - percent = max(0, len(txs) - to_do) * 100 // len(txs) - self.logger.info('catchup {:d}% complete'.format(percent)) - elif first: - first = False - self.logger.info('caught up') - return process + def processing_new_block(self): + '''Return True if we're processing a new block.''' + return self.daemon.cached_height() > self.db.db_height + async def fetch_raw_txs(self, hex_hashes): '''Fetch a list of mempool transactions.''' raw_txs = await self.daemon.getrawtransactions(hex_hashes) diff --git a/server/protocol.py b/server/protocol.py index 95f8922..9f054b5 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -53,12 +53,9 @@ class ServerManager(util.LoggedClass): def __init__(self, env): super().__init__() self.loop = asyncio.get_event_loop() - self.touched = set() - self.touched_event = asyncio.Event() self.start = time.time() - self.bp = BlockProcessor(env, self.touched, self.touched_event) - self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, - self.touched, self.touched_event) + self.bp = BlockProcessor(env) + self.mempool = MemPool(self.bp.daemon, env.coin, self.bp) self.irc = IRC(env) self.env = env self.servers = {} @@ -178,11 +175,11 @@ class ServerManager(util.LoggedClass): self.futures.append(asyncio.ensure_future(coro)) # shutdown() assumes bp.main_loop() is first - add_future(self.bp.main_loop()) + add_future(self.bp.main_loop(self.mempool.touched)) add_future(self.bp.prefetcher.main_loop()) add_future(self.irc.start(self.bp.event)) add_future(self.start_servers(self.bp.event)) - add_future(self.mempool.main_loop(self.bp.event)) + add_future(self.mempool.main_loop()) add_future(self.enqueue_delayed_sessions()) add_future(self.notify()) for n in range(4): @@ -245,10 +242,10 @@ class ServerManager(util.LoggedClass): async def notify(self): '''Notify sessions about height changes and touched addresses.''' while True: - await self.touched_event.wait() - touched = self.touched.copy() - self.touched.clear() - self.touched_event.clear() + await self.mempool.touched_event.wait() + touched = self.mempool.touched.copy() + self.mempool.touched.clear() + self.mempool.touched_event.clear() # Invalidate caches hc = self.history_cache diff --git a/server/version.py b/server/version.py index a590391..55a9e55 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.9.7" +VERSION = "ElectrumX 0.9.8"