Take sleep intervals as arguments

- make histogram refresh its own task
- make _update_histogram take bin_size argument
- synchronize the mempool refresh and hisogram calc with a lock
This commit is contained in:
Neil Booth 2018-08-11 08:16:26 +09:00
parent f20fe9d7a5
commit f781d74ed5

View File

@ -10,6 +10,7 @@
import itertools
import time
from abc import ABC, abstractmethod
from asyncio import Lock
from collections import defaultdict
import attr
@ -92,7 +93,7 @@ class MemPool(object):
hashXs: hashX -> set of all hashes of txs touching the hashX
'''
def __init__(self, coin, api):
def __init__(self, coin, api, refresh_secs=5.0, log_status_secs=120.0):
assert isinstance(api, MemPoolAPI)
self.coin = coin
self.api = api
@ -100,6 +101,10 @@ class MemPool(object):
self.txs = {}
self.hashXs = defaultdict(set) # None can be a key
self.cached_compact_histogram = []
self.refresh_secs = refresh_secs
self.log_status_secs = log_status_secs
# Prevents mempool refreshes during fee histogram calculation
self.lock = Lock()
async def _logging(self, synchronized_event):
'''Print regular logs of mempool stats.'''
@ -112,10 +117,18 @@ class MemPool(object):
while True:
self.logger.info(f'{len(self.txs):,d} txs '
f'touching {len(self.hashXs):,d} addresses')
await sleep(120)
await sleep(self.log_status_secs)
await synchronized_event.wait()
def _update_histogram(self):
async def _refresh_histogram(self, synchronized_event):
while True:
await synchronized_event.wait()
async with self.lock:
# Threaded as can be expensive
await run_in_thread(self._update_histogram, 100_000)
await sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS)
def _update_histogram(self, bin_size):
# Build a histogram by fee rate
histogram = defaultdict(int)
for tx in self.txs.values():
@ -132,7 +145,6 @@ class MemPool(object):
compact = []
cum_size = 0
r = 0 # ?
bin_size = 100 * 1000
for fee_rate, size in sorted(histogram.items(), reverse=True):
cum_size += size
if cum_size + r > bin_size:
@ -187,22 +199,18 @@ class MemPool(object):
async def _refresh_hashes(self, synchronized_event):
'''Refresh our view of the daemon's mempool.'''
secs = 5
histogram_refresh = self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS // secs
for loop_count in itertools.count():
while True:
height = self.api.cached_height()
hex_hashes = await self.api.mempool_hashes()
if height != await self.api.height():
continue
hashes = set(hex_str_to_hash(hh) for hh in hex_hashes)
touched = await self._process_mempool(hashes)
async with self.lock:
touched = await self._process_mempool(hashes)
synchronized_event.set()
synchronized_event.clear()
await self.api.on_mempool(touched, height)
# Thread mempool histogram refreshes - they can be expensive
if loop_count % histogram_refresh == 0:
await run_in_thread(self._update_histogram)
await sleep(secs)
await sleep(self.refresh_secs)
async def _process_mempool(self, all_hashes):
# Re-sync with the new set of hashes
@ -294,6 +302,7 @@ class MemPool(object):
'''Keep the mempool synchronized with the daemon.'''
async with TaskGroup(wait=any) as group:
await group.spawn(self._refresh_hashes(synchronized_event))
await group.spawn(self._refresh_histogram(synchronized_event))
await group.spawn(self._logging(synchronized_event))
async def balance_delta(self, hashX):