diff --git a/RELEASE-NOTES b/RELEASE-NOTES index 8f532cb..a4089ff 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,9 @@ +version 0.8.9 +------------- + +- RPC groups and sessions calls improved +- issues fixed: #62, #68 (slow socket closing, IRC) + version 0.8.8 ------------- diff --git a/docs/HOWTO.rst b/docs/HOWTO.rst index 20cae89..00cdb49 100644 --- a/docs/HOWTO.rst +++ b/docs/HOWTO.rst @@ -75,6 +75,21 @@ on an SSD:: mkdir /path/to/db_directory chown electrumx /path/to/db_directory +Process limits +-------------- + +You should ensure the ElectrumX process has a large open file limit. +During sync it should not need more than about 1,024 open files. When +serving it will use approximately 256 for LevelDB plus the number of +incoming connections. It is not unusual to have 1,000 to 2,000 +connections being served, so I suggest you set your open files limit +to at least 2,500. + +Note that setting the limit in your shell does NOT affect ElectrumX +unless you are invoking ElectrumX directly from your shell. If you +are using systemd, you need to set it in the .service file (see +samples/systemd/electrumx.service in the ElectrumX source). + Using daemontools ----------------- @@ -158,6 +173,10 @@ Once configured, you may want to start ElectrumX at boot:: systemctl enable electrumx +systemd is aggressive in shutting down processes. ElectrumX can need +several minutes to flush cached data to disk during sync. You should +set TimeoutStopSec to at least 10 mins in your .service file. + Sync Progress ============= diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 1a6e0e2..b7e5708 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -46,8 +46,8 @@ class RPCClient(JSONRPC): await request.process(1) async def handle_response(self, result, error, method): - if result and method == 'sessions': - for line in ServerManager.sessions_text_lines(result): + if result and method in ('groups', 'sessions'): + for line in ServerManager.text_lines(method, result): print(line) else: value = {'error': error} if error else result diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 4f30282..c13f2b0 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -145,6 +145,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): def __init__(self): super().__init__() self.start = time.time() + self.stop = 0 self.last_recv = self.start self.bandwidth_start = self.start self.bandwidth_interval = 3600 @@ -195,9 +196,9 @@ class JSONRPC(asyncio.Protocol, LoggedClass): pass def close_connection(self): + self.stop = time.time() if self.transport: self.transport.close() - self.socket.shutdown(socket.SHUT_RDWR) def using_bandwidth(self, amount): now = time.time() diff --git a/server/irc.py b/server/irc.py index 40ac9e6..349557e 100644 --- a/server/irc.py +++ b/server/irc.py @@ -156,11 +156,12 @@ class IRC(LoggedClass): try: ip_addr = socket.gethostbyname(line[1]) except socket.error: - # No IPv4 address could be resolved. Could be .onion or IPv6. + # Could be .onion or IPv6. ip_addr = line[1] peer = self.Peer(ip_addr, line[1], line[2:]) self.peers[nick] = peer - except IndexError: + except (IndexError, UnicodeError): + # UnicodeError comes from invalid domains (issue #68) pass diff --git a/server/protocol.py b/server/protocol.py index 2d11222..61766a4 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -462,11 +462,12 @@ class ServerManager(util.LoggedClass): self.logger.info('cleanly closing client sessions, please wait...') for session in self.sessions: self.close_session(session) - self.logger.info('server listening sockets closed, waiting ' + self.logger.info('listening sockets closed, waiting up to ' '{:d} seconds for socket cleanup'.format(secs)) limit = time.time() + secs while self.sessions and time.time() < limit: - await asyncio.sleep(4) + self.clear_stale_sessions(grace=secs//2) + await asyncio.sleep(2) self.logger.info('{:,d} sessions remaining' .format(len(self.sessions))) @@ -474,7 +475,10 @@ class ServerManager(util.LoggedClass): # Some connections are acknowledged after the servers are closed if not self.servers: return - self.clear_stale_sessions() + now = time.time() + if now > self.next_stale_check: + self.next_stale_check = now + 60 + self.clear_stale_sessions() group = self.groups[int(session.start - self.start) // 60] group.add(session) self.sessions[session] = group @@ -496,23 +500,30 @@ class ServerManager(util.LoggedClass): session.log_me = not session.log_me return 'log {:d}: {}'.format(session.id_, session.log_me) - def clear_stale_sessions(self): - '''Cut off sessions that haven't done anything for 10 minutes.''' + def clear_stale_sessions(self, grace=15): + '''Cut off sessions that haven't done anything for 10 minutes. Force + close stubborn connections that won't close cleanly after a + short grace period. + ''' now = time.time() - if now > self.next_stale_check: - self.next_stale_check = now + 60 - # Clear out empty groups - for key in [k for k, v in self.groups.items() if not v]: - del self.groups[key] - cutoff = now - self.env.session_timeout - stale = [session for session in self.sessions - if session.last_recv < cutoff - and not session.is_closing()] - for session in stale: - self.close_session(session) - if stale: - self.logger.info('closing stale connections {}' - .format([session.id_ for session in stale])) + shutdown_cutoff = now - grace + stale_cutoff = now - self.env.session_timeout + + stale = [] + for session in self.sessions: + if session.is_closing(): + if session.stop <= shutdown_cutoff and session.socket: + # Should trigger a call to connection_lost very soon + self.socket.shutdown(socket.SHUT_RDWR) + else: + if session.last_recv < stale_cutoff: + self.close_session(session) + stale.append(session.id_) + if stale: + self.logger.info('closing stale connections {}'.format(stale)) + # Clear out empty groups + for key in [k for k, v in self.groups.items() if not v]: + del self.groups[key] def new_subscription(self): if self.subscription_count >= self.max_subs: @@ -542,6 +553,52 @@ class ServerManager(util.LoggedClass): 'watched': self.subscription_count, } + @staticmethod + def text_lines(method, data): + if method == 'sessions': + return ServerManager.sessions_text_lines(data) + else: + return ServerManager.groups_text_lines(data) + + @staticmethod + def groups_text_lines(data): + '''A generator returning lines for a list of groups. + + data is the return value of rpc_groups().''' + + fmt = ('{:<6} {:>9} {:>6} {:>6} {:>8}' + '{:>7} {:>9} {:>7} {:>9}') + yield fmt.format('ID', 'Bw Qta KB', 'Reqs', 'Txs', 'Subs', + 'Recv', 'Recv KB', 'Sent', 'Sent KB') + for (id_, bandwidth, reqs, txs_sent, subs, + recv_count, recv_size, send_count, send_size) in data: + yield fmt.format(id_, + '{:,d}'.format(bandwidth // 1024), + '{:,d}'.format(reqs), + '{:,d}'.format(txs_sent), + '{:,d}'.format(subs), + '{:,d}'.format(recv_count), + '{:,d}'.format(recv_size // 1024), + '{:,d}'.format(send_count), + '{:,d}'.format(send_size // 1024)) + + def group_data(self): + '''Returned to the RPC 'groups' call.''' + result = [] + for group_id in sorted(self.groups.keys()): + sessions = self.groups[group_id] + result.append([group_id, + sum(s.bandwidth_used for s in sessions), + sum(s.requests_remaining() for s in sessions), + sum(s.txs_sent for s in sessions), + sum(s.sub_count() for s in sessions), + sum(s.recv_count for s in sessions), + sum(s.recv_size for s in sessions), + sum(s.send_count for s in sessions), + sum(s.send_size for s in sessions), + ]) + return result + @staticmethod def sessions_text_lines(data): '''A generator returning lines for a list of sessions. @@ -553,11 +610,10 @@ class ServerManager(util.LoggedClass): return ('{:3d}:{:02d}:{:02d}' .format(t // 3600, (t % 3600) // 60, t % 60)) - fmt = ('{:<6} {:<5} {:>23} {:>15} {:>7} ' - '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') - yield fmt.format('ID', 'Flags', 'Peer', 'Client', 'Reqs', - 'Txs', 'Subs', 'Recv', 'Recv KB', 'Sent', - 'Sent KB', 'Time') + fmt = ('{:<6} {:<5} {:>23} {:>15} {:>5} {:>5} ' + '{:>7} {:>7} {:>7} {:>7} {:>7} {:>9}') + yield fmt.format('ID', 'Flags', 'Peer', 'Client', 'Reqs', 'Txs', + 'Subs', 'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Time') for (id_, flags, peer, client, reqs, txs_sent, subs, recv_count, recv_size, send_count, send_size, time) in data: yield fmt.format(id_, flags, peer, client, @@ -617,13 +673,7 @@ class ServerManager(util.LoggedClass): return self.server_summary() async def rpc_groups(self, params): - result = {} - msg = '{:,d} sessions, {:,d} requests, {:,d}KB b/w quota used' - for group, sessions in self.groups.items(): - bandwidth = sum(s.bandwidth_used for s in sessions) - reqs = sum(s.requests_remaining() for s in sessions) - result[group] = msg.format(len(sessions), reqs, bandwidth // 1024) - return result + return self.group_data() async def rpc_sessions(self, params): return self.session_data(for_log=False) diff --git a/server/version.py b/server/version.py index e9d2f68..7f649f1 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.8.8a" +VERSION = "ElectrumX 0.8.9"