From e2f4847632b058c28c0baed0c5bb4a35fda4a095 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 13 Dec 2016 22:15:04 +0900 Subject: [PATCH] New env var MAX_SESSIONS When the number of sessions reaches MAX_SESSIONS, which defaults to 1,000, turn off TCP and SSL listening sockets to prevent new connections. When the session count falls below a low watermark, currently 90% of MAX_SESSIONS, the listening sockets will be re-opened. Helps prevent DoS and limit open file usage. Bug fix: do not start serving paused connections until the buffer socket is sufficiently drained. Also, loop. --- docs/ENV-NOTES | 3 ++ server/block_processor.py | 6 +-- server/env.py | 1 + server/protocol.py | 88 ++++++++++++++++++++++++--------------- 4 files changed, 61 insertions(+), 37 deletions(-) diff --git a/docs/ENV-NOTES b/docs/ENV-NOTES index 9861671..840397f 100644 --- a/docs/ENV-NOTES +++ b/docs/ENV-NOTES @@ -50,6 +50,9 @@ in ElectrumX are very cheap - they consume about 100 bytes of memory each and are processed efficiently. I feel the defaults are low and encourage you to raise them. +MAX_SESSIONS - maximum number of sessions. Once reached, TCP and SSL + listening sockets are closed until the session count drops + naturally to 95% of the limit. Defaults to 1,000. MAX_SEND - maximum size of a response message to send over the wire, in bytes. Defaults to 1,000,000 and will treat values smaller than 350,000 as 350,000 because standard Electrum diff --git a/server/block_processor.py b/server/block_processor.py index 50d3d7c..a6b6cab 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -632,9 +632,9 @@ class BlockProcessor(server.db.DB): Value: HASH168 + TX_NUM + VALUE (21 + 4 + 8 = 33 bytes) That's 67 bytes of raw data. Python dictionary overhead means - each entry actually uses about 187 bytes of memory. So almost - 11.5 million UTXOs can fit in 2GB of RAM. There are approximately - 42 million UTXOs on bitcoin mainnet at height 433,000. + each entry actually uses about 187 bytes of memory. So over 5 + million UTXOs can fit in 1GB of RAM. There are approximately 42 + million UTXOs on bitcoin mainnet at height 433,000. Semantics: diff --git a/server/env.py b/server/env.py index f13c56e..b7e5140 100644 --- a/server/env.py +++ b/server/env.py @@ -48,6 +48,7 @@ class Env(LoggedClass): # Server limits to help prevent DoS self.max_send = self.integer('MAX_SEND', 1000000) self.max_subs = self.integer('MAX_SUBS', 250000) + self.max_sessions = self.integer('MAX_SESSIONS', 1000) self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000) self.session_timeout = self.integer('SESSION_TIMEOUT', 600) diff --git a/server/protocol.py b/server/protocol.py index 5a8bf87..3f0f51c 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -38,6 +38,7 @@ class ServerManager(util.LoggedClass): ''' BANDS = 5 + CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) class NotificationRequest(RequestBase): def __init__(self, height, touched): @@ -60,11 +61,14 @@ class ServerManager(util.LoggedClass): self.touched, self.touched_event) self.irc = IRC(env) self.env = env - self.servers = [] + self.servers = {} self.sessions = {} self.groups = defaultdict(set) self.txs_sent = 0 self.next_log_sessions = 0 + self.state = self.CATCHING_UP + self.max_sessions = env.max_sessions + self.low_watermark = self.max_sessions * 19 // 20 self.max_subs = env.max_subs self.subscription_count = 0 self.next_stale_check = 0 @@ -77,6 +81,7 @@ class ServerManager(util.LoggedClass): self.futures = [] env.max_send = max(350000, env.max_send) self.setup_bands() + self.logger.info('max session count: {:,d}'.format(self.max_sessions)) self.logger.info('session timeout: {:,d} seconds' .format(env.session_timeout)) self.logger.info('session bandwidth limit {:,d} bytes' @@ -123,16 +128,23 @@ class ServerManager(util.LoggedClass): + bisect_left(self.bands, group_bandwidth) + 1) // 2 async def enqueue_delayed_sessions(self): - now = time.time() - keep = [] - for pair in self.delayed_sessions: - timeout, session = pair - if timeout <= now: - self.queue.put_nowait(session) - else: - keep.append(pair) - self.delayed_sessions = keep - await asyncio.sleep(1) + while True: + now = time.time() + keep = [] + for pair in self.delayed_sessions: + timeout, session = pair + if not session.pause and timeout <= now: + self.queue.put_nowait(session) + else: + keep.append(pair) + self.delayed_sessions = keep + + # If paused and session count has fallen, start listening again + if (len(self.sessions) <= self.low_watermark + and self.state == self.PAUSED): + await self.start_external_servers() + + await asyncio.sleep(1) def enqueue_session(self, session): # Might have disconnected whilst waiting @@ -143,8 +155,6 @@ class ServerManager(util.LoggedClass): self.next_queue_id += 1 secs = int(session.pause) - if secs: - session.log_info('delaying processing whilst paused') excess = priority - self.BANDS if excess > 0: secs = excess @@ -185,6 +195,14 @@ class ServerManager(util.LoggedClass): await self.shutdown() await asyncio.sleep(1) + def close_servers(self, kinds): + '''Close the servers of the given kinds (TCP etc.).''' + for kind in kinds: + server = self.servers.pop(kind, None) + if server: + server.close() + # Don't bother awaiting the close - we're not async + async def start_server(self, kind, *args, **kw_args): protocol_class = LocalRPC if kind == 'RPC' else ElectrumX protocol = partial(protocol_class, self, self.bp, self.env, kind) @@ -192,7 +210,7 @@ class ServerManager(util.LoggedClass): host, port = args[:2] try: - self.servers.append(await server) + self.servers[kind] = await server except Exception as e: self.logger.error('{} server failed to listen on {}:{:d} :{}' .format(kind, host, port, e)) @@ -201,21 +219,22 @@ class ServerManager(util.LoggedClass): .format(kind, host, port)) async def start_servers(self, caught_up): - '''Connect to IRC and start listening for incoming connections. - - Only connect to IRC if enabled. Start listening on RCP, TCP - and SSL ports only if the port wasn't pecified. Waits for the - caught_up event to be signalled. - ''' + '''Start RPC, TCP and SSL servers once caught up.''' await caught_up.wait() - env = self.env - if env.rpc_port is not None: - await self.start_server('RPC', 'localhost', env.rpc_port) + if self.env.rpc_port is not None: + await self.start_server('RPC', 'localhost', self.env.rpc_port) + await self.start_external_servers() + async def start_external_servers(self): + '''Start listening on TCP and SSL ports, but only if the respective + port was given in the environment. + ''' + self.state = self.LISTENING + + env= self.env if env.tcp_port is not None: await self.start_server('TCP', env.host, env.tcp_port) - if env.ssl_port is not None: # Python 3.5.3: use PROTOCOL_TLS sslc = ssl.SSLContext(ssl.PROTOCOL_SSLv23) @@ -282,15 +301,13 @@ class ServerManager(util.LoggedClass): return history async def shutdown(self): - '''Call to shutdown the servers. Returns when done.''' + '''Call to shutdown everything. Returns when done.''' + self.state = self.SHUTTING_DOWN + self.close_servers(list(self.servers.keys())) self.bp.shutdown() # Don't cancel the block processor main loop - let it close itself for future in self.futures[1:]: future.cancel() - for server in self.servers: - server.close() - await server.wait_closed() - self.servers = [] # So add_session closes new sessions if self.sessions: await self.close_sessions() @@ -308,9 +325,6 @@ class ServerManager(util.LoggedClass): .format(len(self.sessions))) def add_session(self, session): - # Some connections are acknowledged after the servers are closed - if not self.servers: - return now = time.time() if now > self.next_stale_check: self.next_stale_check = now + 300 @@ -320,10 +334,16 @@ class ServerManager(util.LoggedClass): self.sessions[session] = group session.log_info('connection from {}, {:,d} total' .format(session.peername(), len(self.sessions))) + if (len(self.sessions) >= self.max_sessions + and self.state == self.LISTENING): + self.state = self.PAUSED + session.log_info('maximum sessions {:,d} reached, stopping new ' + 'connections until count drops to {:,d}' + .format(self.max_sessions, self.low_watermark)) + self.close_servers(['TCP', 'SSL']) def remove_session(self, session): - # This test should always be True. However if a bug messes - # things up it prevents consequent log noise + '''Remove a session from our sessions list if there.''' if session in self.sessions: group = self.sessions.pop(session) group.remove(session)