From 5c80b96d0f7b0829cdc46a3666aa7a9eb7c66ca1 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 15 Dec 2016 15:55:09 +0900 Subject: [PATCH] Prioritize mempool processing of sent txs Closes #73 --- server/block_processor.py | 2 +- server/mempool.py | 19 +++++++++++++++---- server/protocol.py | 7 ++++++- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 38a978c..2adaa33 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -68,7 +68,7 @@ class Prefetcher(LoggedClass): async def main_loop(self): '''Loop forever polling for more blocks.''' daemon_height = await self.daemon.height() - if daemon_height > height: + if daemon_height > self.fetched_height: log_msg = 'catching up to daemon height {:,d}...' else: log_msg = 'caught up to daemon height {:,d}' diff --git a/server/mempool.py b/server/mempool.py index 94c3336..0f0f283 100644 --- a/server/mempool.py +++ b/server/mempool.py @@ -11,7 +11,6 @@ import asyncio import itertools import time from collections import defaultdict -from functools import partial from lib.hash import hash_to_str, hex_str_to_hash from lib.tx import Deserializer @@ -40,10 +39,16 @@ class MemPool(util.LoggedClass): self.db = db self.touched = set() self.touched_event = asyncio.Event() + self.prioritized = set() self.stop = False self.txs = {} self.hash168s = defaultdict(set) # None can be a key + def prioritize(self, tx_hash): + '''Prioritize processing the given hash. This is important during + initial mempool sync.''' + self.prioritized.add(tx_hash) + def resync_daemon_hashes(self, unprocessed, unfetched): '''Re-sync self.txs with the list of hashes in the daemon's mempool. @@ -105,6 +110,7 @@ class MemPool(util.LoggedClass): self.logger.info('{:,d} txs touching {:,d} addresses' .format(len(txs), len(self.hash168s))) next_log = now + 150 + self.prioritized.clear() await self.daemon.mempool_refresh_event.wait() self.resync_daemon_hashes(unprocessed, unfetched) @@ -137,6 +143,11 @@ class MemPool(util.LoggedClass): nonlocal pending raw_txs = {} + + for hex_hash in self.prioritized: + if hex_hash in unprocessed: + raw_txs[hex_hash] = unprocessed.pop(hex_hash) + while unprocessed and len(raw_txs) < limit: hex_hash, raw_tx = unprocessed.popitem() raw_txs[hex_hash] = raw_tx @@ -147,9 +158,9 @@ class MemPool(util.LoggedClass): deferred = pending pending = [] - process_raw_txs = partial(self.process_raw_txs, raw_txs, deferred) - result, deferred = ( - await loop.run_in_executor(None, process_raw_txs)) + def job(): + return self.process_raw_txs(raw_txs, deferred) + result, deferred = await loop.run_in_executor(None, job) pending.extend(deferred) hash168s = self.hash168s diff --git a/server/protocol.py b/server/protocol.py index 9f054b5..020469f 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -104,6 +104,11 @@ class ServerManager(util.LoggedClass): ''' return self.mempool.value(hash168) + def sent_tx(self, tx_hash): + '''Call when a TX is sent. Tells mempool to prioritize it.''' + self.txs_sent += 1 + self.mempool.prioritize(tx_hash) + def setup_bands(self): bands = [] limit = self.env.bandwidth_limit @@ -898,8 +903,8 @@ class ElectrumX(Session): try: tx_hash = await self.daemon.sendrawtransaction(params) self.txs_sent += 1 - self.manager.txs_sent += 1 self.log_info('sent tx: {}'.format(tx_hash)) + self.manager.sent_tx(tx_hash) return tx_hash except DaemonError as e: error = e.args[0]