Various fixes

- Fix pycodestyle
- Change session.close API
- Fix logging
This commit is contained in:
Neil Booth 2018-08-05 10:46:10 +09:00
parent 4c2834f899
commit 51c9988b81
2 changed files with 21 additions and 21 deletions

View File

@ -43,7 +43,7 @@ class PeerSession(ClientSession):
async def handle_request(self, request):
# We subscribe so might be unlucky enough to get a notification...
if (isinstance(request, Notification) and
request.method == 'blockchain.headers.subscribe'):
request.method == 'blockchain.headers.subscribe'):
pass
else:
await handler_invocation(None, request) # Raises

View File

@ -248,12 +248,9 @@ class SessionManager(object):
for session in stale_sessions)
self.logger.info(f'closing stale connections {text}')
# Give the sockets some time to close gracefully
async with ignore_after(20):
async with TaskGroup() as group:
for session in stale_sessions:
group.spawn(session.close())
for session in stale_sessions:
session.abort()
async with TaskGroup() as group:
for session in stale_sessions:
await group.spawn(session.close(force_after=30))
# Consolidate small groups
bw_limit = self.env.bandwidth_limit
@ -340,9 +337,7 @@ class SessionManager(object):
'''
async def close(session):
'''Close the session's transport.'''
async with ignore_after(2):
await session.close()
session.abort()
await session.close(force_after=2)
return f'disconnected {session.session_id}'
return await self._for_each_session(session_ids, close)
@ -439,10 +434,9 @@ class SessionManager(object):
# Close servers and sessions
self.state = self.SHUTTING_DOWN
await self._close_servers(list(self.servers.keys()))
for session in self.sessions:
session.abort()
for session in list(self.sessions):
await session.close()
with TaskGroup() as group:
for session in list(self.sessions):
await group.spawn(await session.close(force_after=0.1))
def session_count(self):
'''The number of connections that we've sent something to.'''
@ -507,7 +501,9 @@ class SessionBase(ServerSession):
self.txs_sent = 0
self.log_me = False
self.bw_limit = self.env.bandwidth_limit
self._crm_original = self.connection.receive_message
# Hijack the connection so we can log messages
self._receive_message_orig = self.connection.receive_message
self.connection.receive_message = self.receive_message
async def notify(self, height, touched):
pass
@ -520,15 +516,12 @@ class SessionBase(ServerSession):
return super().peer_address_str()
def receive_message(self, message):
self.logger.info(f'processing {message}')
self._crm_original(message)
if self.log_me:
self.logger.info(f'processing {message}')
return self._receive_message_orig(message)
def toggle_logging(self):
self.log_me = not self.log_me
if self.log_me:
self.connection.receive_message = self.receive_message
else:
self.connection.receive_message = self._crm_original
def flags(self):
'''Status flags.'''
@ -634,6 +627,13 @@ class ElectrumX(SessionBase):
def protocol_version_string(self):
return util.version_string(self.protocol_tuple)
# FIXME: make this the aiorpcx API for version 0.7
async def close(self, force_after=30):
'''Close the connection and return when closed.'''
async with ignore_after(force_after):
super().close()
self.abort()
async def daemon_request(self, method, *args):
'''Catch a DaemonError and convert it to an RPCError.'''
try: