Revert "More logical mempool hash handling"
This reverts commit 87d24f38dc.
This commit is contained in:
parent
bf202f8395
commit
2ade5fce9c
@ -29,6 +29,7 @@ class Prefetcher(LoggedClass):
|
||||
def __init__(self, bp):
|
||||
super().__init__()
|
||||
self.bp = bp
|
||||
self.caught_up = False
|
||||
# Access to fetched_height should be protected by the semaphore
|
||||
self.fetched_height = None
|
||||
self.semaphore = asyncio.Semaphore()
|
||||
@ -83,14 +84,7 @@ class Prefetcher(LoggedClass):
|
||||
Repeats until the queue is full or caught up.
|
||||
'''
|
||||
daemon = self.bp.daemon
|
||||
# If caught up, refresh the mempool before the current height
|
||||
caught_up = self.bp.caught_up_event.is_set()
|
||||
if caught_up:
|
||||
mempool = await daemon.mempool_hashes()
|
||||
else:
|
||||
mempool = []
|
||||
|
||||
daemon_height = await daemon.height()
|
||||
daemon_height = await daemon.height(self.bp.caught_up_event.is_set())
|
||||
with await self.semaphore:
|
||||
while self.cache_size < self.min_cache_size:
|
||||
# Try and catch up all blocks but limit to room in cache.
|
||||
@ -100,15 +94,14 @@ class Prefetcher(LoggedClass):
|
||||
count = min(daemon_height - self.fetched_height, cache_room)
|
||||
count = min(500, max(count, 0))
|
||||
if not count:
|
||||
if caught_up:
|
||||
self.bp.set_mempool_hashes(mempool)
|
||||
else:
|
||||
if not self.caught_up:
|
||||
self.caught_up = True
|
||||
self.bp.on_prefetcher_first_caught_up()
|
||||
return False
|
||||
|
||||
first = self.fetched_height + 1
|
||||
hex_hashes = await daemon.block_hex_hashes(first, count)
|
||||
if caught_up:
|
||||
if self.caught_up:
|
||||
self.logger.info('new block height {:,d} hash {}'
|
||||
.format(first + count-1, hex_hashes[-1]))
|
||||
blocks = await daemon.raw_blocks(hex_hashes)
|
||||
@ -128,7 +121,7 @@ class Prefetcher(LoggedClass):
|
||||
else:
|
||||
self.ave_size = (size + (10 - count) * self.ave_size) // 10
|
||||
|
||||
self.bp.on_prefetched_blocks(blocks, first, mempool)
|
||||
self.bp.on_prefetched_blocks(blocks, first)
|
||||
self.cache_size += size
|
||||
self.fetched_height += count
|
||||
|
||||
@ -195,10 +188,9 @@ class BlockProcessor(server.db.DB):
|
||||
'''Add the task to our task queue.'''
|
||||
self.task_queue.put_nowait(task)
|
||||
|
||||
def on_prefetched_blocks(self, blocks, first, mempool):
|
||||
def on_prefetched_blocks(self, blocks, first):
|
||||
'''Called by the prefetcher when it has prefetched some blocks.'''
|
||||
self.add_task(partial(self.check_and_advance_blocks, blocks, first,
|
||||
mempool))
|
||||
self.add_task(partial(self.check_and_advance_blocks, blocks, first))
|
||||
|
||||
def on_prefetcher_first_caught_up(self):
|
||||
'''Called by the prefetcher when it first catches up.'''
|
||||
@ -233,10 +225,7 @@ class BlockProcessor(server.db.DB):
|
||||
self.open_dbs()
|
||||
self.caught_up_event.set()
|
||||
|
||||
def set_mempool_hashes(self, mempool):
|
||||
self.controller.mempool.set_hashes(mempool)
|
||||
|
||||
async def check_and_advance_blocks(self, blocks, first, mempool):
|
||||
async def check_and_advance_blocks(self, blocks, first):
|
||||
'''Process the list of blocks passed. Detects and handles reorgs.'''
|
||||
self.prefetcher.processing_blocks(blocks)
|
||||
if first != self.height + 1:
|
||||
@ -262,7 +251,6 @@ class BlockProcessor(server.db.DB):
|
||||
self.logger.info('processed {:,d} block{} in {:.1f}s'
|
||||
.format(len(blocks), s,
|
||||
time.time() - start))
|
||||
self.set_mempool_hashes(mempool)
|
||||
elif hprevs[0] != chain[0]:
|
||||
await self.reorg_chain()
|
||||
else:
|
||||
|
||||
@ -38,6 +38,8 @@ class Daemon(util.LoggedClass):
|
||||
super().__init__()
|
||||
self.set_urls(urls)
|
||||
self._height = None
|
||||
self._mempool_hashes = set()
|
||||
self.mempool_refresh_event = asyncio.Event()
|
||||
# Limit concurrent RPC calls to this number.
|
||||
# See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16
|
||||
self.workqueue_semaphore = asyncio.Semaphore(value=10)
|
||||
@ -208,7 +210,7 @@ class Daemon(util.LoggedClass):
|
||||
return [bytes.fromhex(block) for block in blocks]
|
||||
|
||||
async def mempool_hashes(self):
|
||||
'''Return a list of the daemon's mempool hashes.'''
|
||||
'''Update our record of the daemon's mempool hashes.'''
|
||||
return await self._send_single('getrawmempool')
|
||||
|
||||
async def estimatefee(self, params):
|
||||
@ -243,11 +245,18 @@ class Daemon(util.LoggedClass):
|
||||
'''Broadcast a transaction to the network.'''
|
||||
return await self._send_single('sendrawtransaction', params)
|
||||
|
||||
async def height(self):
|
||||
async def height(self, mempool=False):
|
||||
'''Query the daemon for its current height.'''
|
||||
self._height = await self._send_single('getblockcount')
|
||||
if mempool:
|
||||
self._mempool_hashes = set(await self.mempool_hashes())
|
||||
self.mempool_refresh_event.set()
|
||||
return self._height
|
||||
|
||||
def cached_mempool_hashes(self):
|
||||
'''Return the cached mempool hashes.'''
|
||||
return self._mempool_hashes
|
||||
|
||||
def cached_height(self):
|
||||
'''Return the cached daemon height.
|
||||
|
||||
|
||||
@ -37,8 +37,6 @@ class MemPool(util.LoggedClass):
|
||||
self.controller = controller
|
||||
self.coin = bp.coin
|
||||
self.db = bp
|
||||
self.hashes = set()
|
||||
self.mempool_refresh_event = asyncio.Event()
|
||||
self.touched = bp.touched
|
||||
self.touched_event = asyncio.Event()
|
||||
self.prioritized = set()
|
||||
@ -51,11 +49,6 @@ class MemPool(util.LoggedClass):
|
||||
initial mempool sync.'''
|
||||
self.prioritized.add(tx_hash)
|
||||
|
||||
def set_hashes(self, hashes):
|
||||
'''Save the list of mempool hashes.'''
|
||||
self.hashes = set(hashes)
|
||||
self.mempool_refresh_event.set()
|
||||
|
||||
def resync_daemon_hashes(self, unprocessed, unfetched):
|
||||
'''Re-sync self.txs with the list of hashes in the daemon's mempool.
|
||||
|
||||
@ -66,7 +59,8 @@ class MemPool(util.LoggedClass):
|
||||
hashXs = self.hashXs
|
||||
touched = self.touched
|
||||
|
||||
gone = set(txs).difference(self.hashes)
|
||||
hashes = self.daemon.cached_mempool_hashes()
|
||||
gone = set(txs).difference(hashes)
|
||||
for hex_hash in gone:
|
||||
unfetched.discard(hex_hash)
|
||||
unprocessed.pop(hex_hash, None)
|
||||
@ -81,7 +75,7 @@ class MemPool(util.LoggedClass):
|
||||
del hashXs[hashX]
|
||||
touched.update(tx_hashXs)
|
||||
|
||||
new = self.hashes.difference(txs)
|
||||
new = hashes.difference(txs)
|
||||
unfetched.update(new)
|
||||
for hex_hash in new:
|
||||
txs[hex_hash] = None
|
||||
@ -98,14 +92,15 @@ class MemPool(util.LoggedClass):
|
||||
fetch_size = 800
|
||||
process_some = self.async_process_some(unfetched, fetch_size // 2)
|
||||
|
||||
await self.mempool_refresh_event.wait()
|
||||
await self.daemon.mempool_refresh_event.wait()
|
||||
self.logger.info('beginning processing of daemon mempool. '
|
||||
'This can take some time...')
|
||||
next_log = 0
|
||||
loops = -1 # Zero during initial catchup
|
||||
|
||||
while True:
|
||||
if self.touched:
|
||||
# Avoid double notifications if processing a block
|
||||
if self.touched and not self.processing_new_block():
|
||||
self.touched_event.set()
|
||||
|
||||
# Log progress / state
|
||||
@ -125,10 +120,10 @@ class MemPool(util.LoggedClass):
|
||||
try:
|
||||
if not todo:
|
||||
self.prioritized.clear()
|
||||
await self.mempool_refresh_event.wait()
|
||||
await self.daemon.mempool_refresh_event.wait()
|
||||
|
||||
self.resync_daemon_hashes(unprocessed, unfetched)
|
||||
self.mempool_refresh_event.clear()
|
||||
self.daemon.mempool_refresh_event.clear()
|
||||
|
||||
if unfetched:
|
||||
count = min(len(unfetched), fetch_size)
|
||||
@ -182,6 +177,10 @@ class MemPool(util.LoggedClass):
|
||||
|
||||
return process
|
||||
|
||||
def processing_new_block(self):
|
||||
'''Return True if we're processing a new block.'''
|
||||
return self.daemon.cached_height() > self.db.db_height
|
||||
|
||||
async def fetch_raw_txs(self, hex_hashes):
|
||||
'''Fetch a list of mempool transactions.'''
|
||||
raw_txs = await self.daemon.getrawtransactions(hex_hashes)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user