Clean up controller init, and sync on catch up
This commit is contained in:
parent
208ed5d2b2
commit
1084060493
@ -54,23 +54,28 @@ class Controller(LoggedClass):
|
||||
.format(env.host, env.ssl_port))
|
||||
|
||||
coros = [
|
||||
self.block_cache.catch_up(),
|
||||
self.block_cache.process_cache()
|
||||
self.block_cache.prefetcher(),
|
||||
self.block_cache.process_blocks(),
|
||||
]
|
||||
|
||||
self.tasks = [asyncio.ensure_future(coro) for coro in coros]
|
||||
for coro in coros:
|
||||
asyncio.ensure_future(coro)
|
||||
|
||||
# Signal handlers
|
||||
for signame in ('SIGINT', 'SIGTERM'):
|
||||
loop.add_signal_handler(getattr(signal, signame),
|
||||
partial(self.on_signal, signame))
|
||||
|
||||
return self.tasks
|
||||
partial(self.on_signal, loop, signame))
|
||||
|
||||
def stop(self):
|
||||
for server in self.servers:
|
||||
server.close()
|
||||
|
||||
def on_signal(self, loop, signame):
|
||||
self.logger.warning('received {} signal, preparing to shut down'
|
||||
.format(signame))
|
||||
for task in asyncio.Task.all_tasks(loop):
|
||||
task.cancel()
|
||||
|
||||
def add_session(self, session):
|
||||
self.sessions.add(session)
|
||||
|
||||
@ -97,12 +102,6 @@ class Controller(LoggedClass):
|
||||
self.jobs = jobs
|
||||
await asyncio.sleep(5)
|
||||
|
||||
def on_signal(self, signame):
|
||||
self.logger.warning('received {} signal, preparing to shut down'
|
||||
.format(signame))
|
||||
for task in self.tasks:
|
||||
task.cancel()
|
||||
|
||||
def address_status(self, hash168):
|
||||
'''Returns status as 32 bytes.'''
|
||||
status = self.addresses.get(hash168)
|
||||
@ -122,70 +121,70 @@ class Controller(LoggedClass):
|
||||
per peer.'''
|
||||
return self.peers
|
||||
|
||||
|
||||
class BlockCache(LoggedClass):
|
||||
'''Requests blocks ahead of time from the daemon. Serves them
|
||||
to the blockchain processor.'''
|
||||
'''Requests and caches blocks ahead of time from the daemon. Serves
|
||||
them to the blockchain processor. Coordinates backing up in case of
|
||||
block chain reorganisations.
|
||||
'''
|
||||
|
||||
def __init__(self, env, db):
|
||||
super().__init__()
|
||||
self.db = db
|
||||
self.daemon_url = env.daemon_url
|
||||
# Cache target size is in MB. Has little effect on sync time.
|
||||
self.cache_limit = 10
|
||||
# Target cache size. Has little effect on sync time.
|
||||
self.target_cache_size = 10 * 1024 * 1024
|
||||
self.daemon_height = 0
|
||||
self.fetched_height = db.height
|
||||
# Blocks stored in reverse order. Next block is at end of list.
|
||||
self.blocks = []
|
||||
self.recent_sizes = []
|
||||
self.ave_size = 0
|
||||
self.queue = asyncio.Queue()
|
||||
self.queue_size = 0
|
||||
self.recent_sizes = [0]
|
||||
|
||||
self.logger.info('using daemon URL {}'.format(self.daemon_url))
|
||||
|
||||
async def process_cache(self):
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
while self.blocks:
|
||||
self.db.process_block(self.blocks.pop(), self.daemon_height)
|
||||
# Release asynchronous block fetching
|
||||
await asyncio.sleep(0)
|
||||
|
||||
async def catch_up(self):
|
||||
self.logger.info('catching up, block cache limit {:d}MB...'
|
||||
.format(self.cache_limit))
|
||||
def flush_db(self):
|
||||
self.db.flush(self.daemon_height, True)
|
||||
|
||||
async def process_blocks(self):
|
||||
try:
|
||||
while await self.maybe_prefill():
|
||||
await asyncio.sleep(1)
|
||||
self.logger.info('caught up to height {:d}'
|
||||
.format(self.daemon_height))
|
||||
while True:
|
||||
blocks, total_size = await self.queue.get()
|
||||
self.queue_size -= total_size
|
||||
for block in blocks:
|
||||
self.db.process_block(block, self.daemon_height)
|
||||
# Release asynchronous block fetching
|
||||
await asyncio.sleep(0)
|
||||
|
||||
if self.db.height == self.daemon_height:
|
||||
self.logger.info('caught up to height {:d}'
|
||||
.format(self.daemon_height))
|
||||
self.flush_db()
|
||||
finally:
|
||||
self.db.flush(self.daemon_height, True)
|
||||
self.flush_db()
|
||||
|
||||
async def prefetcher(self):
|
||||
'''Loops forever polling for more blocks.'''
|
||||
self.logger.info('prefetching blocks...')
|
||||
while True:
|
||||
await self.maybe_prefetch()
|
||||
await asyncio.sleep(2)
|
||||
|
||||
def cache_used(self):
|
||||
return sum(len(block) for block in self.blocks)
|
||||
|
||||
def prefill_count(self, room):
|
||||
count = 0
|
||||
if self.ave_size:
|
||||
count = room // self.ave_size
|
||||
ave_size = sum(self.recent_sizes) // len(self.recent_sizes)
|
||||
count = room // ave_size if ave_size else 0
|
||||
return max(count, 10)
|
||||
|
||||
async def maybe_prefill(self):
|
||||
'''Returns False to stop. True to sleep a while for asynchronous
|
||||
processing.'''
|
||||
cache_limit = self.cache_limit * 1024 * 1024
|
||||
while True:
|
||||
cache_used = self.cache_used()
|
||||
if cache_used > cache_limit:
|
||||
return True
|
||||
|
||||
async def maybe_prefetch(self):
|
||||
'''Prefetch blocks if there are any to prefetch.'''
|
||||
while self.queue_size < self.target_cache_size:
|
||||
# Keep going by getting a whole new cache_limit of blocks
|
||||
self.daemon_height = await self.send_single('getblockcount')
|
||||
max_count = min(self.daemon_height - self.fetched_height, 4000)
|
||||
count = min(max_count, self.prefill_count(cache_limit))
|
||||
count = min(max_count, self.prefill_count(self.target_cache_size))
|
||||
if not count:
|
||||
return False # Done catching up
|
||||
break
|
||||
|
||||
first = self.fetched_height + 1
|
||||
param_lists = [[height] for height in range(first, first + count)]
|
||||
@ -198,16 +197,16 @@ class BlockCache(LoggedClass):
|
||||
|
||||
# Convert hex string to bytes
|
||||
blocks = [bytes.fromhex(block) for block in blocks]
|
||||
# Reverse order and place at front of list
|
||||
self.blocks = list(reversed(blocks)) + self.blocks
|
||||
sizes = [len(block) for block in blocks]
|
||||
total_size = sum(sizes)
|
||||
self.queue.put_nowait((blocks, total_size))
|
||||
self.queue_size += total_size
|
||||
|
||||
# Keep 50 most recent block sizes for fetch count estimation
|
||||
sizes = [len(block) for block in blocks]
|
||||
self.recent_sizes.extend(sizes)
|
||||
excess = len(self.recent_sizes) - 50
|
||||
if excess > 0:
|
||||
self.recent_sizes = self.recent_sizes[excess:]
|
||||
self.ave_size = sum(self.recent_sizes) // len(self.recent_sizes)
|
||||
|
||||
async def send_single(self, method, params=None):
|
||||
payload = {'method': method}
|
||||
|
||||
@ -26,7 +26,9 @@ def main_loop():
|
||||
#loop.set_debug(True)
|
||||
|
||||
controller = Controller(env)
|
||||
tasks = controller.start(loop)
|
||||
controller.start(loop)
|
||||
|
||||
tasks = asyncio.Task.all_tasks(loop)
|
||||
try:
|
||||
loop.run_until_complete(asyncio.gather(*tasks))
|
||||
except asyncio.CancelledError:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user