Don't start processing mempool until caught up
Print server manager settings once servers start
This commit is contained in:
parent
7b8119d787
commit
e91f49101b
@ -77,7 +77,7 @@ class Prefetcher(LoggedClass):
|
|||||||
self.refill_event.set()
|
self.refill_event.set()
|
||||||
return blocks
|
return blocks
|
||||||
|
|
||||||
async def main_loop(self):
|
async def main_loop(self, caught_up_event):
|
||||||
'''Loop forever polling for more blocks.'''
|
'''Loop forever polling for more blocks.'''
|
||||||
daemon_height = await self.daemon.height()
|
daemon_height = await self.daemon.height()
|
||||||
if self.fetched_height >= daemon_height:
|
if self.fetched_height >= daemon_height:
|
||||||
@ -89,7 +89,7 @@ class Prefetcher(LoggedClass):
|
|||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
with await self.semaphore:
|
with await self.semaphore:
|
||||||
await self._prefetch_blocks()
|
await self._prefetch_blocks(caught_up_event.is_set())
|
||||||
await self.refill_event.wait()
|
await self.refill_event.wait()
|
||||||
except DaemonError as e:
|
except DaemonError as e:
|
||||||
self.logger.info('ignoring daemon error: {}'.format(e))
|
self.logger.info('ignoring daemon error: {}'.format(e))
|
||||||
@ -97,13 +97,13 @@ class Prefetcher(LoggedClass):
|
|||||||
await self.clear(-1)
|
await self.clear(-1)
|
||||||
return
|
return
|
||||||
|
|
||||||
async def _prefetch_blocks(self):
|
async def _prefetch_blocks(self, mempool):
|
||||||
'''Prefetch some blocks and put them on the queue.
|
'''Prefetch some blocks and put them on the queue.
|
||||||
|
|
||||||
Repeats until the queue is full or caught up. If caught up,
|
Repeats until the queue is full or caught up. If caught up,
|
||||||
sleep for a period of time before returning.
|
sleep for a period of time before returning.
|
||||||
'''
|
'''
|
||||||
daemon_height = await self.daemon.height(mempool=self.caught_up)
|
daemon_height = await self.daemon.height(mempool)
|
||||||
while self.cache_size < self.min_cache_size:
|
while self.cache_size < self.min_cache_size:
|
||||||
# Try and catch up all blocks but limit to room in cache.
|
# Try and catch up all blocks but limit to room in cache.
|
||||||
# Constrain fetch count to between 0 and 2500 regardless.
|
# Constrain fetch count to between 0 and 2500 regardless.
|
||||||
|
|||||||
@ -79,16 +79,6 @@ class ServerManager(util.LoggedClass):
|
|||||||
self.futures = []
|
self.futures = []
|
||||||
env.max_send = max(350000, env.max_send)
|
env.max_send = max(350000, env.max_send)
|
||||||
self.setup_bands()
|
self.setup_bands()
|
||||||
self.logger.info('max session count: {:,d}'.format(self.max_sessions))
|
|
||||||
self.logger.info('session timeout: {:,d} seconds'
|
|
||||||
.format(env.session_timeout))
|
|
||||||
self.logger.info('session bandwidth limit {:,d} bytes'
|
|
||||||
.format(env.bandwidth_limit))
|
|
||||||
self.logger.info('max response size {:,d} bytes'.format(env.max_send))
|
|
||||||
self.logger.info('max subscriptions across all sessions: {:,d}'
|
|
||||||
.format(self.max_subs))
|
|
||||||
self.logger.info('max subscriptions per session: {:,d}'
|
|
||||||
.format(env.max_session_subs))
|
|
||||||
|
|
||||||
async def mempool_transactions(self, hash168):
|
async def mempool_transactions(self, hash168):
|
||||||
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
|
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
|
||||||
@ -186,7 +176,7 @@ class ServerManager(util.LoggedClass):
|
|||||||
|
|
||||||
# shutdown() assumes bp.main_loop() is first
|
# shutdown() assumes bp.main_loop() is first
|
||||||
add_future(self.bp.main_loop(self.mempool.touched))
|
add_future(self.bp.main_loop(self.mempool.touched))
|
||||||
add_future(self.bp.prefetcher.main_loop())
|
add_future(self.bp.prefetcher.main_loop(self.bp.caught_up_event))
|
||||||
add_future(self.irc.start(self.bp.caught_up_event))
|
add_future(self.irc.start(self.bp.caught_up_event))
|
||||||
add_future(self.start_servers(self.bp.caught_up_event))
|
add_future(self.start_servers(self.bp.caught_up_event))
|
||||||
add_future(self.mempool.main_loop())
|
add_future(self.mempool.main_loop())
|
||||||
@ -231,6 +221,17 @@ class ServerManager(util.LoggedClass):
|
|||||||
if self.env.rpc_port is not None:
|
if self.env.rpc_port is not None:
|
||||||
await self.start_server('RPC', 'localhost', self.env.rpc_port)
|
await self.start_server('RPC', 'localhost', self.env.rpc_port)
|
||||||
await caught_up.wait()
|
await caught_up.wait()
|
||||||
|
self.logger.info('max session count: {:,d}'.format(self.max_sessions))
|
||||||
|
self.logger.info('session timeout: {:,d} seconds'
|
||||||
|
.format(self.env.session_timeout))
|
||||||
|
self.logger.info('session bandwidth limit {:,d} bytes'
|
||||||
|
.format(self.env.bandwidth_limit))
|
||||||
|
self.logger.info('max response size {:,d} bytes'
|
||||||
|
.format(self.env.max_send))
|
||||||
|
self.logger.info('max subscriptions across all sessions: {:,d}'
|
||||||
|
.format(self.max_subs))
|
||||||
|
self.logger.info('max subscriptions per session: {:,d}'
|
||||||
|
.format(self.env.max_session_subs))
|
||||||
await self.start_external_servers()
|
await self.start_external_servers()
|
||||||
|
|
||||||
async def start_external_servers(self):
|
async def start_external_servers(self):
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user