diff --git a/electrumx_server.py b/electrumx_server.py index a749949..7129a8a 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -41,7 +41,7 @@ def main_loop(): def on_exception(loop, context): '''Suppress spurious messages it appears we cannot control.''' message = context.get('message') - if not loop.is_closed() and not message in SUPPRESS_MESSAGES: + if not message in SUPPRESS_MESSAGES: if not ('task' in context and 'accept_connection2()' in repr(context.get('task'))): loop.default_exception_handler(context) diff --git a/server/protocol.py b/server/protocol.py index d714ded..4df4fde 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -351,8 +351,13 @@ class ServerManager(util.LoggedClass): self.logger.info('server listening sockets closed, waiting ' '{:d} seconds for socket cleanup'.format(secs)) limit = time.time() + secs - while self.sessions and time.time() < limit: - await asyncio.sleep(4) + while self.sessions: + if time.time() < limit: + await asyncio.sleep(4) + else: + for session in list(self.sessions): + self.close_session(session, hard=True) + await asyncio.sleep(0) self.logger.info('{:,d} sessions remaining' .format(len(self.sessions))) @@ -368,14 +373,20 @@ class ServerManager(util.LoggedClass): self.close_session(session) def remove_session(self, session): - self.subscription_count -= session.sub_count() - future = self.sessions.pop(session) - future.cancel() + # It might have been forcefully removed earlier by close_session() + if session in self.sessions: + self.subscription_count -= session.sub_count() + future = self.sessions.pop(session) + future.cancel() - def close_session(self, session): + def close_session(self, session, hard=False): '''Close the session's transport and cancel its future.''' session.transport.close() self.sessions[session].cancel() + if hard: + self.remove_session(session) + socket = session.transport.get_extra_info('socket') + socket.close() return 'disconnected {:d}'.format(session.id_) def toggle_logging(self, session): @@ -392,10 +403,10 @@ class ServerManager(util.LoggedClass): stale = [session for session in self.sessions if session.last_recv < cutoff] for session in stale: - self.close_session(session) + self.close_session(session, hard=True) if stale: - self.logger.info('dropped {:,d} stale connections' - .format(len(stale))) + self.logger.info('dropped stale connections {}' + .format([session.id_ for session in stale])) def new_subscription(self): if self.subscription_count >= self.max_subs: