Handle closing sessions a little differently
Move connection type to first flag letter. All seeing eye is not stale.
This commit is contained in:
parent
fcf696d3f3
commit
79a9e91994
@ -373,13 +373,8 @@ class ServerManager(util.LoggedClass):
|
|||||||
self.logger.info('server listening sockets closed, waiting '
|
self.logger.info('server listening sockets closed, waiting '
|
||||||
'{:d} seconds for socket cleanup'.format(secs))
|
'{:d} seconds for socket cleanup'.format(secs))
|
||||||
limit = time.time() + secs
|
limit = time.time() + secs
|
||||||
while self.sessions:
|
while self.sessions and time.time() < limit:
|
||||||
if time.time() < limit:
|
await asyncio.sleep(4)
|
||||||
await asyncio.sleep(4)
|
|
||||||
else:
|
|
||||||
for session in list(self.sessions):
|
|
||||||
self.close_session(session, hard=True)
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
self.logger.info('{:,d} sessions remaining'
|
self.logger.info('{:,d} sessions remaining'
|
||||||
.format(len(self.sessions)))
|
.format(len(self.sessions)))
|
||||||
|
|
||||||
@ -401,14 +396,10 @@ class ServerManager(util.LoggedClass):
|
|||||||
future = self.sessions.pop(session)
|
future = self.sessions.pop(session)
|
||||||
future.cancel()
|
future.cancel()
|
||||||
|
|
||||||
def close_session(self, session, hard=False):
|
def close_session(self, session):
|
||||||
'''Close the session's transport and cancel its future.'''
|
'''Close the session's transport and cancel its future.'''
|
||||||
session.transport.close()
|
session.transport.close()
|
||||||
self.sessions[session].cancel()
|
self.sessions[session].cancel()
|
||||||
if hard:
|
|
||||||
self.remove_session(session)
|
|
||||||
socket = session.transport.get_extra_info('socket')
|
|
||||||
socket.close()
|
|
||||||
return 'disconnected {:d}'.format(session.id_)
|
return 'disconnected {:d}'.format(session.id_)
|
||||||
|
|
||||||
def toggle_logging(self, session):
|
def toggle_logging(self, session):
|
||||||
@ -423,11 +414,13 @@ class ServerManager(util.LoggedClass):
|
|||||||
self.next_stale_check = now + 60
|
self.next_stale_check = now + 60
|
||||||
cutoff = now - self.env.session_timeout
|
cutoff = now - self.env.session_timeout
|
||||||
stale = [session for session in self.sessions
|
stale = [session for session in self.sessions
|
||||||
if session.last_recv < cutoff]
|
if session.last_recv < cutoff
|
||||||
|
and session.client != 'all_seeing_eye'
|
||||||
|
and not session.is_closing()]
|
||||||
for session in stale:
|
for session in stale:
|
||||||
self.close_session(session, hard=True)
|
self.close_session(session)
|
||||||
if stale:
|
if stale:
|
||||||
self.logger.info('dropped stale connections {}'
|
self.logger.info('closing stale connections {}'
|
||||||
.format([session.id_ for session in stale]))
|
.format([session.id_ for session in stale]))
|
||||||
|
|
||||||
def new_subscription(self):
|
def new_subscription(self):
|
||||||
@ -441,12 +434,13 @@ class ServerManager(util.LoggedClass):
|
|||||||
|
|
||||||
def session_count(self):
|
def session_count(self):
|
||||||
'''The number of connections that we've sent something to.'''
|
'''The number of connections that we've sent something to.'''
|
||||||
return len([s for s in self.sessions if s.send_count])
|
return len(self.sessions)
|
||||||
|
|
||||||
def server_summary(self):
|
def server_summary(self):
|
||||||
'''A one-line summary of server state.'''
|
'''A one-line summary of server state.'''
|
||||||
return {
|
return {
|
||||||
'blocks': self.bp.db_height,
|
'blocks': self.bp.db_height,
|
||||||
|
'closing': len([s for s in self.sessions if s.is_closing()]),
|
||||||
'errors': sum(s.error_count for s in self.sessions),
|
'errors': sum(s.error_count for s in self.sessions),
|
||||||
'peers': len(self.irc.peers),
|
'peers': len(self.irc.peers),
|
||||||
'sessions': self.session_count(),
|
'sessions': self.session_count(),
|
||||||
@ -465,15 +459,14 @@ class ServerManager(util.LoggedClass):
|
|||||||
return ('{:3d}:{:02d}:{:02d}'
|
return ('{:3d}:{:02d}:{:02d}'
|
||||||
.format(t // 3600, (t % 3600) // 60, t % 60))
|
.format(t // 3600, (t % 3600) // 60, t % 60))
|
||||||
|
|
||||||
fmt = ('{:<4} {:>7} {:>23} {:>15} {:>7} '
|
fmt = ('{:<6} {:<3} {:>23} {:>15} {:>7} '
|
||||||
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
|
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
|
||||||
yield fmt.format('Type', 'ID ', 'Peer', 'Client', 'Subs',
|
yield fmt.format('ID', 'Flg', 'Peer', 'Client', 'Subs',
|
||||||
'Recv', 'Recv KB', 'Sent', 'Sent KB',
|
'Recv', 'Recv KB', 'Sent', 'Sent KB',
|
||||||
'Txs', 'Time')
|
'Txs', 'Time')
|
||||||
for (kind, id_, log_me, peer, subs, client, recv_count, recv_size,
|
for (id_, flags, peer, subs, client, recv_count, recv_size,
|
||||||
send_count, send_size, txs_sent, time) in data:
|
send_count, send_size, txs_sent, time) in data:
|
||||||
yield fmt.format(kind, str(id_) + ('L' if log_me else ' '),
|
yield fmt.format(id_, flags, peer, client,
|
||||||
peer, client,
|
|
||||||
'{:,d}'.format(subs),
|
'{:,d}'.format(subs),
|
||||||
'{:,d}'.format(recv_count),
|
'{:,d}'.format(recv_count),
|
||||||
'{:,d}'.format(recv_size // 1024),
|
'{:,d}'.format(recv_size // 1024),
|
||||||
@ -486,9 +479,8 @@ class ServerManager(util.LoggedClass):
|
|||||||
'''Returned to the RPC 'sessions' call.'''
|
'''Returned to the RPC 'sessions' call.'''
|
||||||
now = time.time()
|
now = time.time()
|
||||||
sessions = sorted(self.sessions.keys(), key=lambda s: s.start)
|
sessions = sorted(self.sessions.keys(), key=lambda s: s.start)
|
||||||
return [(session.kind,
|
return [(session.id_,
|
||||||
session.id_,
|
session.flags(),
|
||||||
session.log_me,
|
|
||||||
session.peername(for_log=for_log),
|
session.peername(for_log=for_log),
|
||||||
session.sub_count(),
|
session.sub_count(),
|
||||||
session.client,
|
session.client,
|
||||||
@ -563,6 +555,19 @@ class Session(JSONRPC):
|
|||||||
self.bandwidth_limit = env.bandwidth_limit
|
self.bandwidth_limit = env.bandwidth_limit
|
||||||
self.txs_sent = 0
|
self.txs_sent = 0
|
||||||
|
|
||||||
|
def is_closing(self):
|
||||||
|
'''True if this session is closing.'''
|
||||||
|
return self.transport and self.transport.is_closing()
|
||||||
|
|
||||||
|
def flags(self):
|
||||||
|
'''Status flags.'''
|
||||||
|
status = self.kind[0]
|
||||||
|
if self.is_closing():
|
||||||
|
status += 'C'
|
||||||
|
if self.log_me:
|
||||||
|
status += 'L'
|
||||||
|
return status
|
||||||
|
|
||||||
def connection_made(self, transport):
|
def connection_made(self, transport):
|
||||||
'''Handle an incoming client connection.'''
|
'''Handle an incoming client connection.'''
|
||||||
super().connection_made(transport)
|
super().connection_made(transport)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user