Merge branch 'log_sessions' into develop
This commit is contained in:
commit
22c1bbcf03
@ -36,6 +36,9 @@ BANNER_FILE - a path to a banner file to serve to clients. The banner file
|
||||
is re-read for each new client. The string $VERSION in your
|
||||
banner file will be replaced with the ElectrumX version you
|
||||
are runnning, such as 'ElectrumX 0.7.11'.
|
||||
LOG_SESSIONS - the number of seconds between printing session statistics to
|
||||
the log. Defaults to 3600. Set to zero to suppress this
|
||||
logging.
|
||||
ANON_LOGS - set to anything non-empty to remove IP addresses from
|
||||
logs. By default IP addresses will be logged.
|
||||
DONATION_ADDRESS - server donation address. Defaults to none.
|
||||
|
||||
@ -17,6 +17,7 @@ from functools import partial
|
||||
from os import environ
|
||||
|
||||
from lib.jsonrpc import JSONRPC
|
||||
from server.protocol import ServerManager
|
||||
|
||||
|
||||
class RPCClient(JSONRPC):
|
||||
@ -36,33 +37,12 @@ class RPCClient(JSONRPC):
|
||||
|
||||
async def handle_response(self, result, error, method):
|
||||
if result and method == 'sessions':
|
||||
self.print_sessions(result)
|
||||
for line in ServerManager.sessions_text_lines(result):
|
||||
print(line)
|
||||
else:
|
||||
value = {'error': error} if error else result
|
||||
print(json.dumps(value, indent=4, sort_keys=True))
|
||||
|
||||
def print_sessions(self, result):
|
||||
def data_fmt(count, size):
|
||||
return '{:,d}/{:,d}KB'.format(count, size // 1024)
|
||||
def time_fmt(t):
|
||||
t = int(t)
|
||||
return ('{:3d}:{:02d}:{:02d}'
|
||||
.format(t // 3600, (t % 3600) // 60, t % 60))
|
||||
|
||||
fmt = ('{:<4} {:>23} {:>15} {:>7} '
|
||||
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
|
||||
print(fmt.format('Type', 'Peer', 'Client', 'Subs',
|
||||
'Recv #', 'Recv KB', 'Sent #', 'Sent KB',
|
||||
'Errs', 'Time'))
|
||||
for (kind, peer, subs, client, recv_count, recv_size,
|
||||
send_count, send_size, error_count, time) in result:
|
||||
print(fmt.format(kind, peer, client, '{:,d}'.format(subs),
|
||||
'{:,d}'.format(recv_count),
|
||||
'{:,d}'.format(recv_size // 1024),
|
||||
'{:,d}'.format(send_count),
|
||||
'{:,d}'.format(send_size // 1024),
|
||||
'{:,d}'.format(error_count),
|
||||
time_fmt(time)))
|
||||
|
||||
def main():
|
||||
'''Send the RPC command to the server and print the result.'''
|
||||
|
||||
@ -41,6 +41,7 @@ class Env(LoggedClass):
|
||||
self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000)
|
||||
self.banner_file = self.default('BANNER_FILE', None)
|
||||
self.anon_logs = self.default('ANON_LOGS', False)
|
||||
self.log_sessions = self.default('LOG_SESSIONS', 3600)
|
||||
# The electrum client takes the empty string as unspecified
|
||||
self.donation_address = self.default('DONATION_ADDRESS', '')
|
||||
self.db_engine = self.default('DB_ENGINE', 'leveldb')
|
||||
|
||||
@ -224,6 +224,8 @@ class ServerManager(util.LoggedClass):
|
||||
self.env = env
|
||||
self.servers = []
|
||||
self.sessions = {}
|
||||
self.txs_sent = 0
|
||||
self.next_log_sessions = 0
|
||||
self.max_subs = env.max_subs
|
||||
self.subscription_count = 0
|
||||
self.futures = []
|
||||
@ -314,6 +316,13 @@ class ServerManager(util.LoggedClass):
|
||||
# Use a tuple to distinguish from JSON
|
||||
triple = (self.bp.db_height, touched, cache)
|
||||
session.messages.put_nowait(triple)
|
||||
# Periodically log sessions
|
||||
if self.env.log_sessions and time.time() > self.next_log_sessions:
|
||||
data = self.session_data(for_log=True)
|
||||
for line in ServerManager.sessions_text_lines(data):
|
||||
self.logger.info(line)
|
||||
self.logger.info(json.dumps(self.server_summary()))
|
||||
self.next_log_sessions = time.time() + self.env.log_sessions
|
||||
|
||||
async def shutdown(self):
|
||||
'''Call to shutdown the servers. Returns when done.'''
|
||||
@ -372,32 +381,66 @@ class ServerManager(util.LoggedClass):
|
||||
return self.irc.peers
|
||||
|
||||
def session_count(self):
|
||||
'''Returns a dictionary.'''
|
||||
active = len([s for s in self.sessions if s.send_count])
|
||||
total = len(self.sessions)
|
||||
return {'active': active, 'inert': total - active, 'total': total}
|
||||
'''The number of connections that we've sent something to.'''
|
||||
return len([s for s in self.sessions if s.send_count])
|
||||
|
||||
async def rpc_getinfo(self, params):
|
||||
'''The RPC 'getinfo' call.'''
|
||||
def server_summary(self):
|
||||
'''A one-line summary of server state.'''
|
||||
return {
|
||||
'blocks': self.bp.db_height,
|
||||
'errors': sum(s.error_count for s in self.sessions),
|
||||
'peers': len(self.irc.peers),
|
||||
'sessions': self.session_count(),
|
||||
'txs_sent': self.txs_sent,
|
||||
'watched': self.subscription_count,
|
||||
}
|
||||
|
||||
async def rpc_sessions(self, params):
|
||||
@staticmethod
|
||||
def sessions_text_lines(data):
|
||||
'''A generator returning lines for a list of sessions.
|
||||
|
||||
data is the return value of rpc_sessions().'''
|
||||
|
||||
def time_fmt(t):
|
||||
t = int(t)
|
||||
return ('{:3d}:{:02d}:{:02d}'
|
||||
.format(t // 3600, (t % 3600) // 60, t % 60))
|
||||
|
||||
fmt = ('{:<4} {:>23} {:>15} {:>7} '
|
||||
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
|
||||
yield fmt.format('Type', 'Peer', 'Client', 'Subs',
|
||||
'Recv', 'Recv KB', 'Sent', 'Sent KB',
|
||||
'Txs', 'Time')
|
||||
for (kind, peer, subs, client, recv_count, recv_size,
|
||||
send_count, send_size, txs_sent, time) in data:
|
||||
yield fmt.format(kind, peer, client,
|
||||
'{:,d}'.format(subs),
|
||||
'{:,d}'.format(recv_count),
|
||||
'{:,d}'.format(recv_size // 1024),
|
||||
'{:,d}'.format(send_count),
|
||||
'{:,d}'.format(send_size // 1024),
|
||||
'{:,d}'.format(txs_sent),
|
||||
time_fmt(time))
|
||||
|
||||
def session_data(self, for_log):
|
||||
'''Returned to the RPC 'sessions' call.'''
|
||||
now = time.time()
|
||||
sessions = sorted(self.sessions.keys(), key=lambda s: s.start)
|
||||
return [(session.kind,
|
||||
session.peername(for_log=False),
|
||||
session.peername(for_log=for_log),
|
||||
session.sub_count(),
|
||||
session.client,
|
||||
session.recv_count, session.recv_size,
|
||||
session.send_count, session.send_size,
|
||||
session.error_count,
|
||||
session.txs_sent,
|
||||
now - session.start)
|
||||
for session in self.sessions]
|
||||
for session in sessions]
|
||||
|
||||
async def rpc_getinfo(self, params):
|
||||
return self.server_summary()
|
||||
|
||||
async def rpc_sessions(self, params):
|
||||
return self.session_data(for_log=False)
|
||||
|
||||
async def rpc_numsessions(self, params):
|
||||
return self.session_count()
|
||||
@ -428,6 +471,7 @@ class Session(JSONRPC):
|
||||
self.client = 'unknown'
|
||||
self.anon_logs = env.anon_logs
|
||||
self.max_send = env.max_send
|
||||
self.txs_sent = 0
|
||||
|
||||
def connection_made(self, transport):
|
||||
'''Handle an incoming client connection.'''
|
||||
@ -776,6 +820,8 @@ class ElectrumX(Session):
|
||||
'''
|
||||
try:
|
||||
tx_hash = await self.daemon.sendrawtransaction(params)
|
||||
self.txs_sent += 1
|
||||
self.manager.txs_sent += 1
|
||||
self.logger.info('sent tx: {}'.format(tx_hash))
|
||||
return tx_hash
|
||||
except DaemonError as e:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user