Forcefully drop stale sessions or if shutting down
Don't wait for the socket
This commit is contained in:
parent
aaf0592f52
commit
04369dd228
@ -41,7 +41,7 @@ def main_loop():
|
|||||||
def on_exception(loop, context):
|
def on_exception(loop, context):
|
||||||
'''Suppress spurious messages it appears we cannot control.'''
|
'''Suppress spurious messages it appears we cannot control.'''
|
||||||
message = context.get('message')
|
message = context.get('message')
|
||||||
if not loop.is_closed() and not message in SUPPRESS_MESSAGES:
|
if not message in SUPPRESS_MESSAGES:
|
||||||
if not ('task' in context and
|
if not ('task' in context and
|
||||||
'accept_connection2()' in repr(context.get('task'))):
|
'accept_connection2()' in repr(context.get('task'))):
|
||||||
loop.default_exception_handler(context)
|
loop.default_exception_handler(context)
|
||||||
|
|||||||
@ -351,8 +351,13 @@ class ServerManager(util.LoggedClass):
|
|||||||
self.logger.info('server listening sockets closed, waiting '
|
self.logger.info('server listening sockets closed, waiting '
|
||||||
'{:d} seconds for socket cleanup'.format(secs))
|
'{:d} seconds for socket cleanup'.format(secs))
|
||||||
limit = time.time() + secs
|
limit = time.time() + secs
|
||||||
while self.sessions and time.time() < limit:
|
while self.sessions:
|
||||||
await asyncio.sleep(4)
|
if time.time() < limit:
|
||||||
|
await asyncio.sleep(4)
|
||||||
|
else:
|
||||||
|
for session in list(self.sessions):
|
||||||
|
self.close_session(session, hard=True)
|
||||||
|
await asyncio.sleep(0)
|
||||||
self.logger.info('{:,d} sessions remaining'
|
self.logger.info('{:,d} sessions remaining'
|
||||||
.format(len(self.sessions)))
|
.format(len(self.sessions)))
|
||||||
|
|
||||||
@ -368,14 +373,20 @@ class ServerManager(util.LoggedClass):
|
|||||||
self.close_session(session)
|
self.close_session(session)
|
||||||
|
|
||||||
def remove_session(self, session):
|
def remove_session(self, session):
|
||||||
self.subscription_count -= session.sub_count()
|
# It might have been forcefully removed earlier by close_session()
|
||||||
future = self.sessions.pop(session)
|
if session in self.sessions:
|
||||||
future.cancel()
|
self.subscription_count -= session.sub_count()
|
||||||
|
future = self.sessions.pop(session)
|
||||||
|
future.cancel()
|
||||||
|
|
||||||
def close_session(self, session):
|
def close_session(self, session, hard=False):
|
||||||
'''Close the session's transport and cancel its future.'''
|
'''Close the session's transport and cancel its future.'''
|
||||||
session.transport.close()
|
session.transport.close()
|
||||||
self.sessions[session].cancel()
|
self.sessions[session].cancel()
|
||||||
|
if hard:
|
||||||
|
self.remove_session(session)
|
||||||
|
socket = session.transport.get_extra_info('socket')
|
||||||
|
socket.close()
|
||||||
return 'disconnected {:d}'.format(session.id_)
|
return 'disconnected {:d}'.format(session.id_)
|
||||||
|
|
||||||
def toggle_logging(self, session):
|
def toggle_logging(self, session):
|
||||||
@ -392,10 +403,10 @@ class ServerManager(util.LoggedClass):
|
|||||||
stale = [session for session in self.sessions
|
stale = [session for session in self.sessions
|
||||||
if session.last_recv < cutoff]
|
if session.last_recv < cutoff]
|
||||||
for session in stale:
|
for session in stale:
|
||||||
self.close_session(session)
|
self.close_session(session, hard=True)
|
||||||
if stale:
|
if stale:
|
||||||
self.logger.info('dropped {:,d} stale connections'
|
self.logger.info('dropped stale connections {}'
|
||||||
.format(len(stale)))
|
.format([session.id_ for session in stale]))
|
||||||
|
|
||||||
def new_subscription(self):
|
def new_subscription(self):
|
||||||
if self.subscription_count >= self.max_subs:
|
if self.subscription_count >= self.max_subs:
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user