Prefetcher cleanup
This commit is contained in:
parent
ceecdc54ac
commit
e717e719c1
@ -49,16 +49,12 @@ class Prefetcher(LoggedClass):
|
|||||||
self.semaphore = asyncio.Semaphore()
|
self.semaphore = asyncio.Semaphore()
|
||||||
self.queue = asyncio.Queue()
|
self.queue = asyncio.Queue()
|
||||||
self.queue_size = 0
|
self.queue_size = 0
|
||||||
|
self.fetched_height = height
|
||||||
|
self.mempool = []
|
||||||
# Target cache size. Has little effect on sync time.
|
# Target cache size. Has little effect on sync time.
|
||||||
self.target_cache_size = 10 * 1024 * 1024
|
self.target_cache_size = 10 * 1024 * 1024
|
||||||
self.fetched_height = height
|
# First fetch to be 10 blocks
|
||||||
self.recent_sizes = [0]
|
self.ave_size = self.target_cache_size // 10
|
||||||
|
|
||||||
async def get_blocks(self):
|
|
||||||
'''Returns a list of prefetched blocks.'''
|
|
||||||
blocks, total_size = await self.queue.get()
|
|
||||||
self.queue_size -= total_size
|
|
||||||
return blocks
|
|
||||||
|
|
||||||
async def clear(self, height):
|
async def clear(self, height):
|
||||||
'''Clear prefetched blocks and restart from the given height.
|
'''Clear prefetched blocks and restart from the given height.
|
||||||
@ -73,49 +69,73 @@ class Prefetcher(LoggedClass):
|
|||||||
self.queue_size = 0
|
self.queue_size = 0
|
||||||
self.fetched_height = height
|
self.fetched_height = height
|
||||||
|
|
||||||
|
async def get_blocks(self):
|
||||||
|
'''Returns a list of prefetched blocks and the mempool.'''
|
||||||
|
blocks, height, size = await self.queue.get()
|
||||||
|
self.queue_size -= size
|
||||||
|
if height == self.daemon.cached_height():
|
||||||
|
return blocks, self.mempool
|
||||||
|
else:
|
||||||
|
return blocks, None
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
'''Loop forever polling for more blocks.'''
|
'''Loop forever polling for more blocks.'''
|
||||||
self.logger.info('looping forever prefetching blocks...')
|
self.logger.info('starting prefetch loop...')
|
||||||
while True:
|
while True:
|
||||||
while self.queue_size < self.target_cache_size:
|
try:
|
||||||
try:
|
if await self._caught_up():
|
||||||
with await self.semaphore:
|
await asyncio.sleep(5)
|
||||||
await self._prefetch()
|
else:
|
||||||
except DaemonError as e:
|
await asyncio.sleep(0)
|
||||||
self.logger.info('ignoring daemon errors: {}'.format(e))
|
except DaemonError as e:
|
||||||
await asyncio.sleep(2)
|
self.logger.info('ignoring daemon errors: {}'.format(e))
|
||||||
|
|
||||||
def _prefill_count(self, room):
|
async def _caught_up(self):
|
||||||
ave_size = sum(self.recent_sizes) // len(self.recent_sizes)
|
'''Poll for new blocks and mempool state.
|
||||||
count = room // ave_size if ave_size else 0
|
|
||||||
return max(count, 10)
|
Mempool is only queried if caught up with daemon.'''
|
||||||
|
with await self.semaphore:
|
||||||
|
blocks, size = await self._prefetch()
|
||||||
|
self.fetched_height += len(blocks)
|
||||||
|
caught_up = self.fetched_height == self.daemon.cached_height()
|
||||||
|
if caught_up:
|
||||||
|
self.mempool = await self.daemon.mempool_hashes()
|
||||||
|
|
||||||
|
# Wake up block processor if we have something
|
||||||
|
if blocks or caught_up:
|
||||||
|
self.queue.put_nowait((blocks, self.fetched_height, size))
|
||||||
|
self.queue_size += size
|
||||||
|
|
||||||
|
return caught_up
|
||||||
|
|
||||||
async def _prefetch(self):
|
async def _prefetch(self):
|
||||||
'''Prefetch blocks if there are any to prefetch.'''
|
'''Prefetch blocks unless the prefetch queue is full.'''
|
||||||
|
if self.queue_size >= self.target_cache_size:
|
||||||
|
return [], 0
|
||||||
|
|
||||||
daemon_height = await self.daemon.height()
|
daemon_height = await self.daemon.height()
|
||||||
max_count = min(daemon_height - self.fetched_height, 4000)
|
cache_room = self.target_cache_size // self.ave_size
|
||||||
count = min(max_count, self._prefill_count(self.target_cache_size))
|
|
||||||
|
# Try and catch up all blocks but limit to room in cache.
|
||||||
|
# Constrain count to between 0 and 4000 regardless
|
||||||
|
count = min(daemon_height - self.fetched_height, cache_room)
|
||||||
|
count = min(4000, max(count, 0))
|
||||||
if not count:
|
if not count:
|
||||||
return 0
|
return [], 0
|
||||||
|
|
||||||
first = self.fetched_height + 1
|
first = self.fetched_height + 1
|
||||||
hex_hashes = await self.daemon.block_hex_hashes(first, count)
|
hex_hashes = await self.daemon.block_hex_hashes(first, count)
|
||||||
if not hex_hashes:
|
|
||||||
self.logger.error('requested {:,d} hashes, got none'.format(count))
|
|
||||||
return 0
|
|
||||||
|
|
||||||
blocks = await self.daemon.raw_blocks(hex_hashes)
|
blocks = await self.daemon.raw_blocks(hex_hashes)
|
||||||
sizes = [len(block) for block in blocks]
|
|
||||||
total_size = sum(sizes)
|
|
||||||
self.queue.put_nowait((blocks, total_size))
|
|
||||||
self.queue_size += total_size
|
|
||||||
self.fetched_height += len(blocks)
|
|
||||||
|
|
||||||
# Keep 50 most recent block sizes for fetch count estimation
|
size = sum(len(block) for block in blocks)
|
||||||
self.recent_sizes.extend(sizes)
|
|
||||||
excess = len(self.recent_sizes) - 50
|
# Update our recent average block size estimate
|
||||||
if excess > 0:
|
if count >= 10:
|
||||||
self.recent_sizes = self.recent_sizes[excess:]
|
self.ave_size = size // count
|
||||||
return count
|
else:
|
||||||
|
self.ave_size = (size + (10 - count) * self.ave_size) // 10
|
||||||
|
|
||||||
|
return blocks, size
|
||||||
|
|
||||||
|
|
||||||
class BlockProcessor(LoggedClass):
|
class BlockProcessor(LoggedClass):
|
||||||
@ -224,7 +244,7 @@ class BlockProcessor(LoggedClass):
|
|||||||
async def wait_for_blocks(self):
|
async def wait_for_blocks(self):
|
||||||
'''Loop forever processing blocks in the forward direction.'''
|
'''Loop forever processing blocks in the forward direction.'''
|
||||||
while True:
|
while True:
|
||||||
blocks = await self.prefetcher.get_blocks()
|
blocks, mempool = await self.prefetcher.get_blocks()
|
||||||
for block in blocks:
|
for block in blocks:
|
||||||
if not self.advance_block(block):
|
if not self.advance_block(block):
|
||||||
await self.handle_chain_reorg()
|
await self.handle_chain_reorg()
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user