Better RPC sessions stats
This commit is contained in:
parent
7523735f99
commit
831225492e
@ -44,12 +44,28 @@ class RPCClient(asyncio.Protocol):
|
|||||||
if error:
|
if error:
|
||||||
print("ERROR: {}".format(error))
|
print("ERROR: {}".format(error))
|
||||||
else:
|
else:
|
||||||
|
def data_fmt(count, size):
|
||||||
|
return '{:,d}/{:,d}KB'.format(count, size // 1024)
|
||||||
|
def time_fmt(t):
|
||||||
|
t = int(t)
|
||||||
|
return ('{:3d}:{:02d}:{:02d}'
|
||||||
|
.format(t // 3600, (t % 3600) // 60, t % 60))
|
||||||
|
|
||||||
if self.method == 'sessions':
|
if self.method == 'sessions':
|
||||||
fmt = '{:<4} {:>23} {:>7} {:>15} {:>7}'
|
fmt = ('{:<4} {:>23} {:>15} {:>5} '
|
||||||
print(fmt.format('Type', 'Peer', 'Subs', 'Client', 'Time'))
|
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
|
||||||
for kind, peer, subs, client, time in result:
|
print(fmt.format('Type', 'Peer', 'Client', 'Subs',
|
||||||
print(fmt.format(kind, peer, '{:,d}'.format(subs),
|
'Snt #', 'Snt MB', 'Rcv #', 'Rcv MB',
|
||||||
client, '{:,d}'.format(int(time))))
|
'Errs', 'Time'))
|
||||||
|
for (kind, peer, subs, client, recv_count, recv_size,
|
||||||
|
send_count, send_size, error_count, time) in result:
|
||||||
|
print(fmt.format(kind, peer, client, '{:,d}'.format(subs),
|
||||||
|
'{:,d}'.format(recv_count),
|
||||||
|
'{:,.1f}'.format(recv_size / 1048576),
|
||||||
|
'{:,d}'.format(send_count),
|
||||||
|
'{:,.1f}'.format(send_size / 1048576),
|
||||||
|
'{:,d}'.format(error_count),
|
||||||
|
time_fmt(time)))
|
||||||
else:
|
else:
|
||||||
pprint.pprint(result, indent=4)
|
pprint.pprint(result, indent=4)
|
||||||
|
|
||||||
|
|||||||
@ -33,9 +33,13 @@ class BlockServer(BlockProcessor):
|
|||||||
def __init__(self, env):
|
def __init__(self, env):
|
||||||
super().__init__(env)
|
super().__init__(env)
|
||||||
self.server_mgr = ServerManager(self, env)
|
self.server_mgr = ServerManager(self, env)
|
||||||
|
self.bs_caught_up = False
|
||||||
|
|
||||||
async def caught_up(self, mempool_hashes):
|
async def caught_up(self, mempool_hashes):
|
||||||
await super().caught_up(mempool_hashes)
|
await super().caught_up(mempool_hashes)
|
||||||
|
if not self.bs_caught_up:
|
||||||
|
await self.server_mgr.start_servers()
|
||||||
|
self.bs_caught_up = True
|
||||||
self.server_mgr.notify(self.height, self.touched)
|
self.server_mgr.notify(self.height, self.touched)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
@ -64,6 +68,7 @@ class ServerManager(LoggedClass):
|
|||||||
protocol = partial(protocol_class, self, self.bp, self.env, kind)
|
protocol = partial(protocol_class, self, self.bp, self.env, kind)
|
||||||
server = loop.create_server(protocol, *args, **kw_args)
|
server = loop.create_server(protocol, *args, **kw_args)
|
||||||
|
|
||||||
|
host, port = args[:2]
|
||||||
try:
|
try:
|
||||||
self.servers.append(await server)
|
self.servers.append(await server)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
@ -103,17 +108,11 @@ class ServerManager(LoggedClass):
|
|||||||
else:
|
else:
|
||||||
self.logger.info('IRC disabled')
|
self.logger.info('IRC disabled')
|
||||||
|
|
||||||
async def notify(self, height, touched):
|
def notify(self, height, touched):
|
||||||
'''Notify electrum clients about height changes and touched addresses.
|
'''Notify sessions about height changes and touched addresses.'''
|
||||||
|
|
||||||
Start listening if not yet listening.
|
|
||||||
'''
|
|
||||||
if not self.servers:
|
|
||||||
await self.start_servers()
|
|
||||||
|
|
||||||
sessions = [session for session in self.sessions
|
sessions = [session for session in self.sessions
|
||||||
if isinstance(session, ElectrumX)]
|
if isinstance(session, ElectrumX)]
|
||||||
self.ElectrumX.notify(sessions, height, touched)
|
ElectrumX.notify(sessions, height, touched)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
'''Close the listening servers.'''
|
'''Close the listening servers.'''
|
||||||
@ -157,7 +156,7 @@ class ServerManager(LoggedClass):
|
|||||||
return self.irc.peers
|
return self.irc.peers
|
||||||
|
|
||||||
def session_count(self):
|
def session_count(self):
|
||||||
return len(self.manager.sessions)
|
return len(self.sessions)
|
||||||
|
|
||||||
def info(self):
|
def info(self):
|
||||||
'''Returned in the RPC 'getinfo' call.'''
|
'''Returned in the RPC 'getinfo' call.'''
|
||||||
@ -179,6 +178,9 @@ class ServerManager(LoggedClass):
|
|||||||
session.peername(),
|
session.peername(),
|
||||||
len(session.hash168s),
|
len(session.hash168s),
|
||||||
'RPC' if isinstance(session, LocalRPC) else session.client,
|
'RPC' if isinstance(session, LocalRPC) else session.client,
|
||||||
|
session.recv_count, session.recv_size,
|
||||||
|
session.send_count, session.send_size,
|
||||||
|
session.error_count,
|
||||||
now - session.start)
|
now - session.start)
|
||||||
for session in self.sessions]
|
for session in self.sessions]
|
||||||
|
|
||||||
@ -211,7 +213,7 @@ class Session(JSONRPC):
|
|||||||
'Sent {:,d} bytes in {:,d} messages {:,d} errors'
|
'Sent {:,d} bytes in {:,d} messages {:,d} errors'
|
||||||
.format(self.peername(), self.send_size,
|
.format(self.peername(), self.send_size,
|
||||||
self.send_count, self.error_count))
|
self.send_count, self.error_count))
|
||||||
self.maanger.remove_session(self)
|
self.manager.remove_session(self)
|
||||||
|
|
||||||
def method_handler(self, method):
|
def method_handler(self, method):
|
||||||
'''Return the handler that will handle the RPC method.'''
|
'''Return the handler that will handle the RPC method.'''
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user