Move some daemon logic to daemon.py
This commit is contained in:
parent
6711ed0ae8
commit
897e68d20c
@ -164,14 +164,13 @@ class BlockCache(LoggedClass):
|
||||
self.daemon = daemon
|
||||
# Target cache size. Has little effect on sync time.
|
||||
self.target_cache_size = 10 * 1024 * 1024
|
||||
self.daemon_height = 0
|
||||
self.fetched_height = db.height
|
||||
self.queue = asyncio.Queue()
|
||||
self.queue_size = 0
|
||||
self.recent_sizes = [0]
|
||||
|
||||
def flush_db(self):
|
||||
self.db.flush(self.daemon_height, True)
|
||||
self.db.flush(self.daemon.cached_height(), True)
|
||||
|
||||
async def process_blocks(self):
|
||||
try:
|
||||
@ -179,13 +178,13 @@ class BlockCache(LoggedClass):
|
||||
blocks, total_size = await self.queue.get()
|
||||
self.queue_size -= total_size
|
||||
for block in blocks:
|
||||
self.db.process_block(block, self.daemon_height)
|
||||
self.db.process_block(block, self.daemon.cached_height())
|
||||
# Release asynchronous block fetching
|
||||
await asyncio.sleep(0)
|
||||
|
||||
if self.db.height == self.daemon_height:
|
||||
if self.db.height == self.daemon.cached_height():
|
||||
self.logger.info('caught up to height {:d}'
|
||||
.format(self.daemon_height))
|
||||
.format(self.db_height))
|
||||
self.flush_db()
|
||||
finally:
|
||||
self.flush_db()
|
||||
@ -210,26 +209,19 @@ class BlockCache(LoggedClass):
|
||||
|
||||
async def maybe_prefetch(self):
|
||||
'''Prefetch blocks if there are any to prefetch.'''
|
||||
daemon = self.daemon
|
||||
while self.queue_size < self.target_cache_size:
|
||||
# Keep going by getting a whole new cache_limit of blocks
|
||||
self.daemon_height = await daemon.send_single('getblockcount')
|
||||
max_count = min(self.daemon_height - self.fetched_height, 4000)
|
||||
daemon_height = await self.daemon.height()
|
||||
max_count = min(daemon_height - self.fetched_height, 4000)
|
||||
count = min(max_count, self.prefill_count(self.target_cache_size))
|
||||
if not count:
|
||||
break
|
||||
|
||||
first = self.fetched_height + 1
|
||||
param_lists = [[height] for height in range(first, first + count)]
|
||||
hashes = await daemon.send_vector('getblockhash', param_lists)
|
||||
hashes = await self.daemon.block_hex_hashes(first, count)
|
||||
blocks = await self.daemon.raw_blocks(hashes)
|
||||
|
||||
# Hashes is an array of hex strings
|
||||
param_lists = [(h, False) for h in hashes]
|
||||
blocks = await daemon.send_vector('getblock', param_lists)
|
||||
self.fetched_height += count
|
||||
|
||||
# Convert hex string to bytes
|
||||
blocks = [bytes.fromhex(block) for block in blocks]
|
||||
sizes = [len(block) for block in blocks]
|
||||
total_size = sum(sizes)
|
||||
self.queue.put_nowait((blocks, total_size))
|
||||
|
||||
@ -23,6 +23,7 @@ class Daemon(LoggedClass):
|
||||
def __init__(self, url):
|
||||
super().__init__()
|
||||
self.url = url
|
||||
self._height = None
|
||||
self.logger.info('connecting to daemon at URL {}'.format(url))
|
||||
|
||||
async def send_single(self, method, params=None):
|
||||
@ -33,14 +34,18 @@ class Daemon(LoggedClass):
|
||||
return result
|
||||
|
||||
async def send_many(self, mp_pairs):
|
||||
payload = [{'method': method, 'params': params}
|
||||
for method, params in mp_pairs]
|
||||
return await self.send(payload)
|
||||
if mp_pairs:
|
||||
payload = [{'method': method, 'params': params}
|
||||
for method, params in mp_pairs]
|
||||
return await self.send(payload)
|
||||
return []
|
||||
|
||||
async def send_vector(self, method, params_list):
|
||||
payload = [{'method': method, 'params': params}
|
||||
for params in params_list]
|
||||
return await self.send(payload)
|
||||
if params_list:
|
||||
payload = [{'method': method, 'params': params}
|
||||
for params in params_list]
|
||||
return await self.send(payload)
|
||||
return []
|
||||
|
||||
async def send(self, payload):
|
||||
assert isinstance(payload, (tuple, list))
|
||||
@ -68,3 +73,26 @@ class Daemon(LoggedClass):
|
||||
self.logger.error('{}. Sleeping {:d}s and trying again...'
|
||||
.format(msg, secs))
|
||||
await asyncio.sleep(secs)
|
||||
|
||||
async def block_hex_hashes(self, first, count):
|
||||
'''Return the hex hashes of count block starting at height first.'''
|
||||
param_lists = [[height] for height in range(first, first + count)]
|
||||
return await self.send_vector('getblockhash', param_lists)
|
||||
|
||||
async def raw_blocks(self, hex_hashes):
|
||||
'''Return the raw binary blocks with the given hex hashes.'''
|
||||
param_lists = [(h, False) for h in hex_hashes]
|
||||
blocks = await self.send_vector('getblock', param_lists)
|
||||
# Convert hex string to bytes
|
||||
return [bytes.fromhex(block) for block in blocks]
|
||||
|
||||
async def height(self):
|
||||
'''Query the daemon for its current height.'''
|
||||
self._height = await self.send_single('getblockcount')
|
||||
return self._height
|
||||
|
||||
def cached_height(self):
|
||||
'''Return the cached daemon height.
|
||||
|
||||
If the daemon has not been queried yet this returns None.'''
|
||||
return self._height
|
||||
|
||||
Loading…
Reference in New Issue
Block a user