From f781d74ed5699b8269339f3199fe81d6e82eab14 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 11 Aug 2018 08:16:26 +0900 Subject: [PATCH] Take sleep intervals as arguments - make histogram refresh its own task - make _update_histogram take bin_size argument - synchronize the mempool refresh and hisogram calc with a lock --- electrumx/server/mempool.py | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/electrumx/server/mempool.py b/electrumx/server/mempool.py index 67e3826..1481171 100644 --- a/electrumx/server/mempool.py +++ b/electrumx/server/mempool.py @@ -10,6 +10,7 @@ import itertools import time from abc import ABC, abstractmethod +from asyncio import Lock from collections import defaultdict import attr @@ -92,7 +93,7 @@ class MemPool(object): hashXs: hashX -> set of all hashes of txs touching the hashX ''' - def __init__(self, coin, api): + def __init__(self, coin, api, refresh_secs=5.0, log_status_secs=120.0): assert isinstance(api, MemPoolAPI) self.coin = coin self.api = api @@ -100,6 +101,10 @@ class MemPool(object): self.txs = {} self.hashXs = defaultdict(set) # None can be a key self.cached_compact_histogram = [] + self.refresh_secs = refresh_secs + self.log_status_secs = log_status_secs + # Prevents mempool refreshes during fee histogram calculation + self.lock = Lock() async def _logging(self, synchronized_event): '''Print regular logs of mempool stats.''' @@ -112,10 +117,18 @@ class MemPool(object): while True: self.logger.info(f'{len(self.txs):,d} txs ' f'touching {len(self.hashXs):,d} addresses') - await sleep(120) + await sleep(self.log_status_secs) await synchronized_event.wait() - def _update_histogram(self): + async def _refresh_histogram(self, synchronized_event): + while True: + await synchronized_event.wait() + async with self.lock: + # Threaded as can be expensive + await run_in_thread(self._update_histogram, 100_000) + await sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS) + + def _update_histogram(self, bin_size): # Build a histogram by fee rate histogram = defaultdict(int) for tx in self.txs.values(): @@ -132,7 +145,6 @@ class MemPool(object): compact = [] cum_size = 0 r = 0 # ? - bin_size = 100 * 1000 for fee_rate, size in sorted(histogram.items(), reverse=True): cum_size += size if cum_size + r > bin_size: @@ -187,22 +199,18 @@ class MemPool(object): async def _refresh_hashes(self, synchronized_event): '''Refresh our view of the daemon's mempool.''' - secs = 5 - histogram_refresh = self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS // secs - for loop_count in itertools.count(): + while True: height = self.api.cached_height() hex_hashes = await self.api.mempool_hashes() if height != await self.api.height(): continue hashes = set(hex_str_to_hash(hh) for hh in hex_hashes) - touched = await self._process_mempool(hashes) + async with self.lock: + touched = await self._process_mempool(hashes) synchronized_event.set() synchronized_event.clear() await self.api.on_mempool(touched, height) - # Thread mempool histogram refreshes - they can be expensive - if loop_count % histogram_refresh == 0: - await run_in_thread(self._update_histogram) - await sleep(secs) + await sleep(self.refresh_secs) async def _process_mempool(self, all_hashes): # Re-sync with the new set of hashes @@ -294,6 +302,7 @@ class MemPool(object): '''Keep the mempool synchronized with the daemon.''' async with TaskGroup(wait=any) as group: await group.spawn(self._refresh_hashes(synchronized_event)) + await group.spawn(self._refresh_histogram(synchronized_event)) await group.spawn(self._logging(synchronized_event)) async def balance_delta(self, hashX):