From ac48695db865250b6a668fa323ceb53bccaf352a Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 15 Dec 2016 15:38:22 +0900 Subject: [PATCH 1/4] daemon: getting height optionally gets mempool Improve daemon startup log message --- server/block_processor.py | 14 ++++++++------ server/daemon.py | 16 +++++++++++----- server/mempool.py | 2 +- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 7d364a4..38a978c 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -67,8 +67,13 @@ class Prefetcher(LoggedClass): async def main_loop(self): '''Loop forever polling for more blocks.''' - self.logger.info('catching up to daemon height {:,d}...' - .format(await self.daemon.height())) + daemon_height = await self.daemon.height() + if daemon_height > height: + log_msg = 'catching up to daemon height {:,d}...' + else: + log_msg = 'caught up to daemon height {:,d}' + self.logger.info(log_msg.format(daemon_height)) + while True: try: secs = 0 @@ -87,10 +92,7 @@ class Prefetcher(LoggedClass): '''Prefetch blocks unless the prefetch queue is full.''' # Refresh the mempool after updating the daemon height, if and # only if we've caught up - daemon_height = await self.daemon.height() - if self.caught_up: - await self.daemon.refresh_mempool_hashes() - + daemon_height = await self.daemon.height(mempool=self.caught_up) cache_room = self.target_cache_size // self.ave_size with await self.semaphore: # Try and catch up all blocks but limit to room in cache. diff --git a/server/daemon.py b/server/daemon.py index 42897b3..2fb2e44 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -36,7 +36,7 @@ class Daemon(util.LoggedClass): self.urls = urls self.url_index = 0 self._height = None - self.mempool_hashes = set() + self._mempool_hashes = set() self.mempool_refresh_event = asyncio.Event() # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 @@ -150,10 +150,9 @@ class Daemon(util.LoggedClass): # Convert hex string to bytes return [bytes.fromhex(block) for block in blocks] - async def refresh_mempool_hashes(self): + async def mempool_hashes(self): '''Update our record of the daemon's mempool hashes.''' - self.mempool_hashes = set(await self._send_single('getrawmempool')) - self.mempool_refresh_event.set() + return await self._send_single('getrawmempool') async def estimatefee(self, params): '''Return the fee estimate for the given parameters.''' @@ -187,11 +186,18 @@ class Daemon(util.LoggedClass): '''Broadcast a transaction to the network.''' return await self._send_single('sendrawtransaction', params) - async def height(self): + async def height(self, mempool=False): '''Query the daemon for its current height.''' self._height = await self._send_single('getblockcount') + if mempool: + self._mempool_hashes = set(await self.mempool_hashes()) + self.mempool_refresh_event.set() return self._height + def cached_mempool_hashes(self): + '''Return the cached mempool hashes.''' + return self._mempool_hashes + def cached_height(self): '''Return the cached daemon height. diff --git a/server/mempool.py b/server/mempool.py index 0984154..94c3336 100644 --- a/server/mempool.py +++ b/server/mempool.py @@ -54,7 +54,7 @@ class MemPool(util.LoggedClass): hash168s = self.hash168s touched = self.touched - hashes = self.daemon.mempool_hashes + hashes = self.daemon.cached_mempool_hashes() gone = set(txs).difference(hashes) for hex_hash in gone: unfetched.discard(hex_hash) From 5c80b96d0f7b0829cdc46a3666aa7a9eb7c66ca1 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 15 Dec 2016 15:55:09 +0900 Subject: [PATCH 2/4] Prioritize mempool processing of sent txs Closes #73 --- server/block_processor.py | 2 +- server/mempool.py | 19 +++++++++++++++---- server/protocol.py | 7 ++++++- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 38a978c..2adaa33 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -68,7 +68,7 @@ class Prefetcher(LoggedClass): async def main_loop(self): '''Loop forever polling for more blocks.''' daemon_height = await self.daemon.height() - if daemon_height > height: + if daemon_height > self.fetched_height: log_msg = 'catching up to daemon height {:,d}...' else: log_msg = 'caught up to daemon height {:,d}' diff --git a/server/mempool.py b/server/mempool.py index 94c3336..0f0f283 100644 --- a/server/mempool.py +++ b/server/mempool.py @@ -11,7 +11,6 @@ import asyncio import itertools import time from collections import defaultdict -from functools import partial from lib.hash import hash_to_str, hex_str_to_hash from lib.tx import Deserializer @@ -40,10 +39,16 @@ class MemPool(util.LoggedClass): self.db = db self.touched = set() self.touched_event = asyncio.Event() + self.prioritized = set() self.stop = False self.txs = {} self.hash168s = defaultdict(set) # None can be a key + def prioritize(self, tx_hash): + '''Prioritize processing the given hash. This is important during + initial mempool sync.''' + self.prioritized.add(tx_hash) + def resync_daemon_hashes(self, unprocessed, unfetched): '''Re-sync self.txs with the list of hashes in the daemon's mempool. @@ -105,6 +110,7 @@ class MemPool(util.LoggedClass): self.logger.info('{:,d} txs touching {:,d} addresses' .format(len(txs), len(self.hash168s))) next_log = now + 150 + self.prioritized.clear() await self.daemon.mempool_refresh_event.wait() self.resync_daemon_hashes(unprocessed, unfetched) @@ -137,6 +143,11 @@ class MemPool(util.LoggedClass): nonlocal pending raw_txs = {} + + for hex_hash in self.prioritized: + if hex_hash in unprocessed: + raw_txs[hex_hash] = unprocessed.pop(hex_hash) + while unprocessed and len(raw_txs) < limit: hex_hash, raw_tx = unprocessed.popitem() raw_txs[hex_hash] = raw_tx @@ -147,9 +158,9 @@ class MemPool(util.LoggedClass): deferred = pending pending = [] - process_raw_txs = partial(self.process_raw_txs, raw_txs, deferred) - result, deferred = ( - await loop.run_in_executor(None, process_raw_txs)) + def job(): + return self.process_raw_txs(raw_txs, deferred) + result, deferred = await loop.run_in_executor(None, job) pending.extend(deferred) hash168s = self.hash168s diff --git a/server/protocol.py b/server/protocol.py index 9f054b5..020469f 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -104,6 +104,11 @@ class ServerManager(util.LoggedClass): ''' return self.mempool.value(hash168) + def sent_tx(self, tx_hash): + '''Call when a TX is sent. Tells mempool to prioritize it.''' + self.txs_sent += 1 + self.mempool.prioritize(tx_hash) + def setup_bands(self): bands = [] limit = self.env.bandwidth_limit @@ -898,8 +903,8 @@ class ElectrumX(Session): try: tx_hash = await self.daemon.sendrawtransaction(params) self.txs_sent += 1 - self.manager.txs_sent += 1 self.log_info('sent tx: {}'.format(tx_hash)) + self.manager.sent_tx(tx_hash) return tx_hash except DaemonError as e: error = e.args[0] From 49714a45e67ca125414f3f31c8dfe1b9b3c8cfe8 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 15 Dec 2016 16:56:37 +0900 Subject: [PATCH 3/4] Mempool tx processing to handle DBError Fixes #74 --- server/mempool.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/server/mempool.py b/server/mempool.py index 0f0f283..ce149f3 100644 --- a/server/mempool.py +++ b/server/mempool.py @@ -242,10 +242,11 @@ class MemPool(util.LoggedClass): elif not mempool_missing: prev_hash = hex_str_to_hash(prev_hex_hash) txin_pairs.append(db_utxo_lookup(prev_hash, prev_idx)) - except self.db.MissingUTXOError: - # This typically happens just after the daemon has - # accepted a new block and the new mempool has deps on - # new txs in that block. + except (self.db.MissingUTXOError, self.db.DBError): + # DBError can happen when flushing a newly processed + # block. MissingUTXOError typically happens just + # after the daemon has accepted a new block and the + # new mempool has deps on new txs in that block. continue if mempool_missing: From 35f07bd01e0470c29e0d82cb17ba5905609a457f Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 15 Dec 2016 15:56:30 +0900 Subject: [PATCH 4/4] Prepare 0.9.9 --- RELEASE-NOTES | 6 ++++++ server/version.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/RELEASE-NOTES b/RELEASE-NOTES index 859abc7..7890a47 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,9 @@ +version 0.9.9 +------------- + +- prioritize mempool processing of sent txs. Closes issue 73. +- mempool tx processing needs to handle DBError exceptions. Fixes issue 74. + version 0.9.8 ------------- diff --git a/server/version.py b/server/version.py index 55a9e55..6ea65e0 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.9.8" +VERSION = "ElectrumX 0.9.9"