From 51c9988b81e65b7be58d069c3515320c6ede88e7 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 5 Aug 2018 10:46:10 +0900 Subject: [PATCH] Various fixes - Fix pycodestyle - Change session.close API - Fix logging --- electrumx/server/peers.py | 2 +- electrumx/server/session.py | 40 ++++++++++++++++++------------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index abcee0b..00d1ea5 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -43,7 +43,7 @@ class PeerSession(ClientSession): async def handle_request(self, request): # We subscribe so might be unlucky enough to get a notification... if (isinstance(request, Notification) and - request.method == 'blockchain.headers.subscribe'): + request.method == 'blockchain.headers.subscribe'): pass else: await handler_invocation(None, request) # Raises diff --git a/electrumx/server/session.py b/electrumx/server/session.py index e64fbdb..ecbdb2b 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -248,12 +248,9 @@ class SessionManager(object): for session in stale_sessions) self.logger.info(f'closing stale connections {text}') # Give the sockets some time to close gracefully - async with ignore_after(20): - async with TaskGroup() as group: - for session in stale_sessions: - group.spawn(session.close()) - for session in stale_sessions: - session.abort() + async with TaskGroup() as group: + for session in stale_sessions: + await group.spawn(session.close(force_after=30)) # Consolidate small groups bw_limit = self.env.bandwidth_limit @@ -340,9 +337,7 @@ class SessionManager(object): ''' async def close(session): '''Close the session's transport.''' - async with ignore_after(2): - await session.close() - session.abort() + await session.close(force_after=2) return f'disconnected {session.session_id}' return await self._for_each_session(session_ids, close) @@ -439,10 +434,9 @@ class SessionManager(object): # Close servers and sessions self.state = self.SHUTTING_DOWN await self._close_servers(list(self.servers.keys())) - for session in self.sessions: - session.abort() - for session in list(self.sessions): - await session.close() + with TaskGroup() as group: + for session in list(self.sessions): + await group.spawn(await session.close(force_after=0.1)) def session_count(self): '''The number of connections that we've sent something to.''' @@ -507,7 +501,9 @@ class SessionBase(ServerSession): self.txs_sent = 0 self.log_me = False self.bw_limit = self.env.bandwidth_limit - self._crm_original = self.connection.receive_message + # Hijack the connection so we can log messages + self._receive_message_orig = self.connection.receive_message + self.connection.receive_message = self.receive_message async def notify(self, height, touched): pass @@ -520,15 +516,12 @@ class SessionBase(ServerSession): return super().peer_address_str() def receive_message(self, message): - self.logger.info(f'processing {message}') - self._crm_original(message) + if self.log_me: + self.logger.info(f'processing {message}') + return self._receive_message_orig(message) def toggle_logging(self): self.log_me = not self.log_me - if self.log_me: - self.connection.receive_message = self.receive_message - else: - self.connection.receive_message = self._crm_original def flags(self): '''Status flags.''' @@ -634,6 +627,13 @@ class ElectrumX(SessionBase): def protocol_version_string(self): return util.version_string(self.protocol_tuple) + # FIXME: make this the aiorpcx API for version 0.7 + async def close(self, force_after=30): + '''Close the connection and return when closed.''' + async with ignore_after(force_after): + super().close() + self.abort() + async def daemon_request(self, method, *args): '''Catch a DaemonError and convert it to an RPCError.''' try: