diff --git a/RELEASE-NOTES b/RELEASE-NOTES index 859abc7..7890a47 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,9 @@ +version 0.9.9 +------------- + +- prioritize mempool processing of sent txs. Closes issue 73. +- mempool tx processing needs to handle DBError exceptions. Fixes issue 74. + version 0.9.8 ------------- diff --git a/server/block_processor.py b/server/block_processor.py index 7d364a4..2adaa33 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -67,8 +67,13 @@ class Prefetcher(LoggedClass): async def main_loop(self): '''Loop forever polling for more blocks.''' - self.logger.info('catching up to daemon height {:,d}...' - .format(await self.daemon.height())) + daemon_height = await self.daemon.height() + if daemon_height > self.fetched_height: + log_msg = 'catching up to daemon height {:,d}...' + else: + log_msg = 'caught up to daemon height {:,d}' + self.logger.info(log_msg.format(daemon_height)) + while True: try: secs = 0 @@ -87,10 +92,7 @@ class Prefetcher(LoggedClass): '''Prefetch blocks unless the prefetch queue is full.''' # Refresh the mempool after updating the daemon height, if and # only if we've caught up - daemon_height = await self.daemon.height() - if self.caught_up: - await self.daemon.refresh_mempool_hashes() - + daemon_height = await self.daemon.height(mempool=self.caught_up) cache_room = self.target_cache_size // self.ave_size with await self.semaphore: # Try and catch up all blocks but limit to room in cache. diff --git a/server/daemon.py b/server/daemon.py index 42897b3..2fb2e44 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -36,7 +36,7 @@ class Daemon(util.LoggedClass): self.urls = urls self.url_index = 0 self._height = None - self.mempool_hashes = set() + self._mempool_hashes = set() self.mempool_refresh_event = asyncio.Event() # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 @@ -150,10 +150,9 @@ class Daemon(util.LoggedClass): # Convert hex string to bytes return [bytes.fromhex(block) for block in blocks] - async def refresh_mempool_hashes(self): + async def mempool_hashes(self): '''Update our record of the daemon's mempool hashes.''' - self.mempool_hashes = set(await self._send_single('getrawmempool')) - self.mempool_refresh_event.set() + return await self._send_single('getrawmempool') async def estimatefee(self, params): '''Return the fee estimate for the given parameters.''' @@ -187,11 +186,18 @@ class Daemon(util.LoggedClass): '''Broadcast a transaction to the network.''' return await self._send_single('sendrawtransaction', params) - async def height(self): + async def height(self, mempool=False): '''Query the daemon for its current height.''' self._height = await self._send_single('getblockcount') + if mempool: + self._mempool_hashes = set(await self.mempool_hashes()) + self.mempool_refresh_event.set() return self._height + def cached_mempool_hashes(self): + '''Return the cached mempool hashes.''' + return self._mempool_hashes + def cached_height(self): '''Return the cached daemon height. diff --git a/server/mempool.py b/server/mempool.py index 0984154..ce149f3 100644 --- a/server/mempool.py +++ b/server/mempool.py @@ -11,7 +11,6 @@ import asyncio import itertools import time from collections import defaultdict -from functools import partial from lib.hash import hash_to_str, hex_str_to_hash from lib.tx import Deserializer @@ -40,10 +39,16 @@ class MemPool(util.LoggedClass): self.db = db self.touched = set() self.touched_event = asyncio.Event() + self.prioritized = set() self.stop = False self.txs = {} self.hash168s = defaultdict(set) # None can be a key + def prioritize(self, tx_hash): + '''Prioritize processing the given hash. This is important during + initial mempool sync.''' + self.prioritized.add(tx_hash) + def resync_daemon_hashes(self, unprocessed, unfetched): '''Re-sync self.txs with the list of hashes in the daemon's mempool. @@ -54,7 +59,7 @@ class MemPool(util.LoggedClass): hash168s = self.hash168s touched = self.touched - hashes = self.daemon.mempool_hashes + hashes = self.daemon.cached_mempool_hashes() gone = set(txs).difference(hashes) for hex_hash in gone: unfetched.discard(hex_hash) @@ -105,6 +110,7 @@ class MemPool(util.LoggedClass): self.logger.info('{:,d} txs touching {:,d} addresses' .format(len(txs), len(self.hash168s))) next_log = now + 150 + self.prioritized.clear() await self.daemon.mempool_refresh_event.wait() self.resync_daemon_hashes(unprocessed, unfetched) @@ -137,6 +143,11 @@ class MemPool(util.LoggedClass): nonlocal pending raw_txs = {} + + for hex_hash in self.prioritized: + if hex_hash in unprocessed: + raw_txs[hex_hash] = unprocessed.pop(hex_hash) + while unprocessed and len(raw_txs) < limit: hex_hash, raw_tx = unprocessed.popitem() raw_txs[hex_hash] = raw_tx @@ -147,9 +158,9 @@ class MemPool(util.LoggedClass): deferred = pending pending = [] - process_raw_txs = partial(self.process_raw_txs, raw_txs, deferred) - result, deferred = ( - await loop.run_in_executor(None, process_raw_txs)) + def job(): + return self.process_raw_txs(raw_txs, deferred) + result, deferred = await loop.run_in_executor(None, job) pending.extend(deferred) hash168s = self.hash168s @@ -231,10 +242,11 @@ class MemPool(util.LoggedClass): elif not mempool_missing: prev_hash = hex_str_to_hash(prev_hex_hash) txin_pairs.append(db_utxo_lookup(prev_hash, prev_idx)) - except self.db.MissingUTXOError: - # This typically happens just after the daemon has - # accepted a new block and the new mempool has deps on - # new txs in that block. + except (self.db.MissingUTXOError, self.db.DBError): + # DBError can happen when flushing a newly processed + # block. MissingUTXOError typically happens just + # after the daemon has accepted a new block and the + # new mempool has deps on new txs in that block. continue if mempool_missing: diff --git a/server/protocol.py b/server/protocol.py index 9f054b5..020469f 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -104,6 +104,11 @@ class ServerManager(util.LoggedClass): ''' return self.mempool.value(hash168) + def sent_tx(self, tx_hash): + '''Call when a TX is sent. Tells mempool to prioritize it.''' + self.txs_sent += 1 + self.mempool.prioritize(tx_hash) + def setup_bands(self): bands = [] limit = self.env.bandwidth_limit @@ -898,8 +903,8 @@ class ElectrumX(Session): try: tx_hash = await self.daemon.sendrawtransaction(params) self.txs_sent += 1 - self.manager.txs_sent += 1 self.log_info('sent tx: {}'.format(tx_hash)) + self.manager.sent_tx(tx_hash) return tx_hash except DaemonError as e: error = e.args[0] diff --git a/server/version.py b/server/version.py index 55a9e55..6ea65e0 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.9.8" +VERSION = "ElectrumX 0.9.9"