From c08ade586188da6ff4f5412d7ef53760b9896dc8 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 29 Nov 2016 07:53:07 +0900 Subject: [PATCH] Add session logging facility Move session logging code to protocol.py from electrum_rpc.py Use it for periodic logging controlled by envvar LOG_SESSIONS For each session, track sent transaction stats and show that per-session instead of errors --- docs/ENV-NOTES | 3 +++ electrumx_rpc.py | 26 +++--------------- server/env.py | 1 + server/protocol.py | 66 +++++++++++++++++++++++++++++++++++++++------- 4 files changed, 63 insertions(+), 33 deletions(-) diff --git a/docs/ENV-NOTES b/docs/ENV-NOTES index ee70098..c18593f 100644 --- a/docs/ENV-NOTES +++ b/docs/ENV-NOTES @@ -36,6 +36,9 @@ BANNER_FILE - a path to a banner file to serve to clients. The banner file is re-read for each new client. The string $VERSION in your banner file will be replaced with the ElectrumX version you are runnning, such as 'ElectrumX 0.7.11'. +LOG_SESSIONS - the number of seconds between printing session statistics to + the log. Defaults to 3600. Set to zero to suppress this + logging. ANON_LOGS - set to anything non-empty to remove IP addresses from logs. By default IP addresses will be logged. DONATION_ADDRESS - server donation address. Defaults to none. diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 052ba7c..192f887 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -17,6 +17,7 @@ from functools import partial from os import environ from lib.jsonrpc import JSONRPC +from server.protocol import ServerManager class RPCClient(JSONRPC): @@ -36,33 +37,12 @@ class RPCClient(JSONRPC): async def handle_response(self, result, error, method): if result and method == 'sessions': - self.print_sessions(result) + for line in ServerManager.sessions_text_lines(result): + print(line) else: value = {'error': error} if error else result print(json.dumps(value, indent=4, sort_keys=True)) - def print_sessions(self, result): - def data_fmt(count, size): - return '{:,d}/{:,d}KB'.format(count, size // 1024) - def time_fmt(t): - t = int(t) - return ('{:3d}:{:02d}:{:02d}' - .format(t // 3600, (t % 3600) // 60, t % 60)) - - fmt = ('{:<4} {:>23} {:>15} {:>7} ' - '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') - print(fmt.format('Type', 'Peer', 'Client', 'Subs', - 'Recv #', 'Recv KB', 'Sent #', 'Sent KB', - 'Errs', 'Time')) - for (kind, peer, subs, client, recv_count, recv_size, - send_count, send_size, error_count, time) in result: - print(fmt.format(kind, peer, client, '{:,d}'.format(subs), - '{:,d}'.format(recv_count), - '{:,d}'.format(recv_size // 1024), - '{:,d}'.format(send_count), - '{:,d}'.format(send_size // 1024), - '{:,d}'.format(error_count), - time_fmt(time))) def main(): '''Send the RPC command to the server and print the result.''' diff --git a/server/env.py b/server/env.py index 6160cde..2ff44e0 100644 --- a/server/env.py +++ b/server/env.py @@ -41,6 +41,7 @@ class Env(LoggedClass): self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000) self.banner_file = self.default('BANNER_FILE', None) self.anon_logs = self.default('ANON_LOGS', False) + self.log_sessions = self.default('LOG_SESSIONS', 3600) # The electrum client takes the empty string as unspecified self.donation_address = self.default('DONATION_ADDRESS', '') self.db_engine = self.default('DB_ENGINE', 'leveldb') diff --git a/server/protocol.py b/server/protocol.py index bd84a1a..efca69b 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -224,6 +224,8 @@ class ServerManager(util.LoggedClass): self.env = env self.servers = [] self.sessions = {} + self.txs_sent = 0 + self.next_log_sessions = 0 self.max_subs = env.max_subs self.subscription_count = 0 self.futures = [] @@ -314,6 +316,13 @@ class ServerManager(util.LoggedClass): # Use a tuple to distinguish from JSON triple = (self.bp.db_height, touched, cache) session.messages.put_nowait(triple) + # Periodically log sessions + if self.env.log_sessions and time.time() > self.next_log_sessions: + data = self.session_data(for_log=True) + for line in ServerManager.sessions_text_lines(data): + self.logger.info(line) + self.logger.info(json.dumps(self.server_summary())) + self.next_log_sessions = time.time() + self.env.log_sessions async def shutdown(self): '''Call to shutdown the servers. Returns when done.''' @@ -372,32 +381,66 @@ class ServerManager(util.LoggedClass): return self.irc.peers def session_count(self): - '''Returns a dictionary.''' - active = len([s for s in self.sessions if s.send_count]) - total = len(self.sessions) - return {'active': active, 'inert': total - active, 'total': total} + '''The number of connections that we've sent something to.''' + return len([s for s in self.sessions if s.send_count]) - async def rpc_getinfo(self, params): - '''The RPC 'getinfo' call.''' + def server_summary(self): + '''A one-line summary of server state.''' return { 'blocks': self.bp.db_height, + 'errors': sum(s.error_count for s in self.sessions), 'peers': len(self.irc.peers), 'sessions': self.session_count(), + 'txs_sent': self.txs_sent, 'watched': self.subscription_count, } - async def rpc_sessions(self, params): + @staticmethod + def sessions_text_lines(data): + '''A generator returning lines for a list of sessions. + + data is the return value of rpc_sessions().''' + + def time_fmt(t): + t = int(t) + return ('{:3d}:{:02d}:{:02d}' + .format(t // 3600, (t % 3600) // 60, t % 60)) + + fmt = ('{:<4} {:>23} {:>15} {:>7} ' + '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') + yield fmt.format('Type', 'Peer', 'Client', 'Subs', + 'Recv', 'Recv KB', 'Sent', 'Sent KB', + 'Txs', 'Time') + for (kind, peer, subs, client, recv_count, recv_size, + send_count, send_size, txs_sent, time) in data: + yield fmt.format(kind, peer, client, + '{:,d}'.format(subs), + '{:,d}'.format(recv_count), + '{:,d}'.format(recv_size // 1024), + '{:,d}'.format(send_count), + '{:,d}'.format(send_size // 1024), + '{:,d}'.format(txs_sent), + time_fmt(time)) + + def session_data(self, for_log): '''Returned to the RPC 'sessions' call.''' now = time.time() + sessions = sorted(self.sessions.keys(), key=lambda s: s.start) return [(session.kind, - session.peername(for_log=False), + session.peername(for_log=for_log), session.sub_count(), session.client, session.recv_count, session.recv_size, session.send_count, session.send_size, - session.error_count, + session.txs_sent, now - session.start) - for session in self.sessions] + for session in sessions] + + async def rpc_getinfo(self, params): + return self.server_summary() + + async def rpc_sessions(self, params): + return self.session_data(for_log=False) async def rpc_numsessions(self, params): return self.session_count() @@ -428,6 +471,7 @@ class Session(JSONRPC): self.client = 'unknown' self.anon_logs = env.anon_logs self.max_send = env.max_send + self.txs_sent = 0 def connection_made(self, transport): '''Handle an incoming client connection.''' @@ -776,6 +820,8 @@ class ElectrumX(Session): ''' try: tx_hash = await self.daemon.sendrawtransaction(params) + self.txs_sent += 1 + self.manager.txs_sent += 1 self.logger.info('sent tx: {}'.format(tx_hash)) return tx_hash except DaemonError as e: