Tweak task handling so all errors are logged
This commit is contained in:
parent
3fbd4992ce
commit
68dbf9fad2
@ -103,7 +103,7 @@ class ServerBase(object):
|
|||||||
partial(self.on_signal, signame))
|
partial(self.on_signal, signame))
|
||||||
loop.set_exception_handler(self.on_exception)
|
loop.set_exception_handler(self.on_exception)
|
||||||
|
|
||||||
self.tasks.create_task(self.start_servers())
|
await self.start_servers()
|
||||||
|
|
||||||
# Wait for shutdown to be signalled, and log it.
|
# Wait for shutdown to be signalled, and log it.
|
||||||
# Derived classes may want to provide a shutdown() coroutine.
|
# Derived classes may want to provide a shutdown() coroutine.
|
||||||
|
|||||||
@ -46,9 +46,20 @@ class Tasks(object):
|
|||||||
'''Run a function in a separate thread, and await its completion.'''
|
'''Run a function in a separate thread, and await its completion.'''
|
||||||
return await self.loop.run_in_executor(None, func, *args)
|
return await self.loop.run_in_executor(None, func, *args)
|
||||||
|
|
||||||
def create_task(self, coro):
|
def create_task(self, coro, daemon=True):
|
||||||
'''Schedule the coro to be run.'''
|
'''Schedule the coro to be run.'''
|
||||||
return self.tasks.create_task(coro)
|
task = self.tasks.create_task(coro)
|
||||||
|
if daemon:
|
||||||
|
task.add_done_callback(self._check_task_exception)
|
||||||
|
return task
|
||||||
|
|
||||||
|
def _check_task_exception(self, task):
|
||||||
|
'''Check a task for exceptions.'''
|
||||||
|
try:
|
||||||
|
if not task.cancelled():
|
||||||
|
task.result()
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.exception(f'uncaught task exception: {e}')
|
||||||
|
|
||||||
async def cancel_all(self, wait=True):
|
async def cancel_all(self, wait=True):
|
||||||
'''Cancels all tasks and waits for them to complete.'''
|
'''Cancels all tasks and waits for them to complete.'''
|
||||||
|
|||||||
@ -111,10 +111,10 @@ class Controller(ServerBase):
|
|||||||
'''Start the RPC server and wait for the mempool to synchronize. Then
|
'''Start the RPC server and wait for the mempool to synchronize. Then
|
||||||
start the peer manager and serving external clients.
|
start the peer manager and serving external clients.
|
||||||
'''
|
'''
|
||||||
self.session_mgr.start_rpc_server()
|
await self.session_mgr.start_rpc_server()
|
||||||
await self.bp.catch_up_to_daemon()
|
await self.bp.catch_up_to_daemon()
|
||||||
await self.mempool.start_and_wait_for_sync()
|
await self.mempool.start_and_wait_for_sync()
|
||||||
self.session_mgr.start_serving()
|
await self.session_mgr.start_serving()
|
||||||
# Peer discovery should start after we start serving because
|
# Peer discovery should start after we start serving because
|
||||||
# we connect to ourself
|
# we connect to ourself
|
||||||
self.peer_mgr.start_peer_discovery()
|
self.peer_mgr.start_peer_discovery()
|
||||||
|
|||||||
@ -165,7 +165,8 @@ class MemPool(object):
|
|||||||
# Process new transactions
|
# Process new transactions
|
||||||
new_hashes = list(all_hashes.difference(txs))
|
new_hashes = list(all_hashes.difference(txs))
|
||||||
jobs = [self.tasks.create_task(self._fetch_and_accept
|
jobs = [self.tasks.create_task(self._fetch_and_accept
|
||||||
(hashes, all_hashes, touched))
|
(hashes, all_hashes, touched),
|
||||||
|
daemon=False)
|
||||||
for hashes in chunks(new_hashes, 2000)]
|
for hashes in chunks(new_hashes, 2000)]
|
||||||
if jobs:
|
if jobs:
|
||||||
await asyncio.gather(*jobs)
|
await asyncio.gather(*jobs)
|
||||||
|
|||||||
@ -194,9 +194,11 @@ class PeerManager(object):
|
|||||||
# Retry a failed connection if enough time has passed
|
# Retry a failed connection if enough time has passed
|
||||||
return peer.last_try < now - WAKEUP_SECS * 2 ** peer.try_count
|
return peer.last_try < now - WAKEUP_SECS * 2 ** peer.try_count
|
||||||
|
|
||||||
|
tasks = []
|
||||||
for peer in self.peers:
|
for peer in self.peers:
|
||||||
if should_retry(peer):
|
if should_retry(peer):
|
||||||
self.tasks.create_task(self._retry_peer(peer))
|
tasks.append(self.tasks.create_task(self._retry_peer(peer)))
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
|
||||||
async def _retry_peer(self, peer):
|
async def _retry_peer(self, peer):
|
||||||
peer.try_count += 1
|
peer.try_count += 1
|
||||||
@ -275,7 +277,7 @@ class PeerManager(object):
|
|||||||
peer.features['server_version'] = server_version
|
peer.features['server_version'] = server_version
|
||||||
ptuple = protocol_tuple(protocol_version)
|
ptuple = protocol_tuple(protocol_version)
|
||||||
|
|
||||||
jobs = [self.tasks.create_task(message) for message in (
|
jobs = [self.tasks.create_task(message, daemon=False) for message in (
|
||||||
self._send_headers_subscribe(session, peer, timeout, ptuple),
|
self._send_headers_subscribe(session, peer, timeout, ptuple),
|
||||||
self._send_server_features(session, peer, timeout),
|
self._send_server_features(session, peer, timeout),
|
||||||
self._send_peers_subscribe(session, peer, timeout)
|
self._send_peers_subscribe(session, peer, timeout)
|
||||||
|
|||||||
@ -396,13 +396,13 @@ class SessionManager(object):
|
|||||||
|
|
||||||
# --- External Interface
|
# --- External Interface
|
||||||
|
|
||||||
def start_rpc_server(self):
|
async def start_rpc_server(self):
|
||||||
'''Start the RPC server if enabled.'''
|
'''Start the RPC server if enabled.'''
|
||||||
if self.env.rpc_port is not None:
|
if self.env.rpc_port is not None:
|
||||||
self.tasks.create_task(self._start_server(
|
await self._start_server('RPC', self.env.cs_host(for_rpc=True),
|
||||||
'RPC', self.env.cs_host(for_rpc=True), self.env.rpc_port))
|
self.env.rpc_port)
|
||||||
|
|
||||||
def start_serving(self):
|
async def start_serving(self):
|
||||||
'''Start TCP and SSL servers.'''
|
'''Start TCP and SSL servers.'''
|
||||||
self.logger.info('max session count: {:,d}'.format(self.max_sessions))
|
self.logger.info('max session count: {:,d}'.format(self.max_sessions))
|
||||||
self.logger.info('session timeout: {:,d} seconds'
|
self.logger.info('session timeout: {:,d} seconds'
|
||||||
@ -418,7 +418,7 @@ class SessionManager(object):
|
|||||||
if self.env.drop_client is not None:
|
if self.env.drop_client is not None:
|
||||||
self.logger.info('drop clients matching: {}'
|
self.logger.info('drop clients matching: {}'
|
||||||
.format(self.env.drop_client.pattern))
|
.format(self.env.drop_client.pattern))
|
||||||
self.tasks.create_task(self._start_external_servers())
|
await self._start_external_servers()
|
||||||
self.tasks.create_task(self._housekeeping())
|
self.tasks.create_task(self._housekeeping())
|
||||||
|
|
||||||
async def shutdown(self):
|
async def shutdown(self):
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user