From a82ab0b07577c624031e725ce780d4bcb77c30b9 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 4 Dec 2016 21:14:45 +0900 Subject: [PATCH 1/6] Add feature to rate limit connections --- docs/ENV-NOTES | 3 +++ server/env.py | 2 ++ server/protocol.py | 37 ++++++++++++++++++++++++++----------- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/docs/ENV-NOTES b/docs/ENV-NOTES index 58baf97..d16f567 100644 --- a/docs/ENV-NOTES +++ b/docs/ENV-NOTES @@ -83,6 +83,9 @@ BANDWIDTH_LIMIT - per-session periodic bandwith usage limit in bytes. end of each period. Currently the period is hard-coded to be one hour. The default limit value is 2 million bytes. +CONN_LIMIT - the number of new incoming connections is limited to +CONN_LIMIT_SECS CONN_LIMIT connections every CONN_LIMIT_SECS. + The default is 10 every 10 seconds. SESSION_TIMEOUT - an integer number of seconds defaulting to 600. Sessions with no activity for longer than this are disconnected. diff --git a/server/env.py b/server/env.py index f13c56e..5f30ac6 100644 --- a/server/env.py +++ b/server/env.py @@ -50,6 +50,8 @@ class Env(LoggedClass): self.max_subs = self.integer('MAX_SUBS', 250000) self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000) + self.conn_limit = self.integer('CONN_LIMIT', 10) + self.conn_limit_secs = self.integer('CONN_LIMIT_SECS', 10) self.session_timeout = self.integer('SESSION_TIMEOUT', 600) # IRC self.irc = self.default('IRC', False) diff --git a/server/protocol.py b/server/protocol.py index 6696c52..8ed4a73 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -233,11 +233,15 @@ class ServerManager(util.LoggedClass): self.next_stale_check = 0 self.history_cache = pylru.lrucache(512) self.futures = [] + self.cl_kizami = 0 + self.cl_session_id = 0 env.max_send = max(350000, env.max_send) self.logger.info('session timeout: {:,d} seconds' .format(env.session_timeout)) self.logger.info('session bandwidth limit {:,d} bytes' .format(env.bandwidth_limit)) + self.logger.info('new sessions limited to {:,d} every {:,d}s' + .format(env.conn_limit, env.conn_limit_secs)) self.logger.info('max response size {:,d} bytes'.format(env.max_send)) self.logger.info('max subscriptions across all sessions: {:,d}' .format(self.max_subs)) @@ -379,18 +383,30 @@ class ServerManager(util.LoggedClass): .format(len(self.sessions))) def add_session(self, session): - self.clear_stale_sessions() - coro = session.serve_requests() - future = asyncio.ensure_future(coro) - self.sessions[session] = future - session.log_info('connection from {}, {:,d} total' - .format(session.peername(), len(self.sessions))) - # Some connections are acknowledged after the servers are closed - if not self.servers: - self.close_session(session) + now = time.time() + self.clear_stale_sessions(now) + if now > self.cl_kizami: + self.cl_kizami = now + self.env.conn_limit_secs + self.cl_session_id = session.id_ + count = session.id_ - self.cl_session_id + if count > self.env.conn_limit: + session.log_info('closing connection from {}: {:,d} in last {:,d}s' + .format(session.peername(), count, + self.env.conn_limit_secs)) + session.transport.close() + else: + coro = session.serve_requests() + future = asyncio.ensure_future(coro) + self.sessions[session] = future + session.log_info('connection from {}, {:,d} total' + .format(session.peername(), len(self.sessions))) + # Some connections are acknowledged after the servers are closed + if not self.servers: + self.close_session(session) def remove_session(self, session): # It might have been forcefully removed earlier by close_session() + # or never added because of connection rate limiting if session in self.sessions: self.subscription_count -= session.sub_count() future = self.sessions.pop(session) @@ -407,9 +423,8 @@ class ServerManager(util.LoggedClass): session.log_me = not session.log_me return 'log {:d}: {}'.format(session.id_, session.log_me) - def clear_stale_sessions(self): + def clear_stale_sessions(self, now): '''Cut off sessions that haven't done anything for 10 minutes.''' - now = time.time() if now > self.next_stale_check: self.next_stale_check = now + 60 cutoff = now - self.env.session_timeout From be8adbbcd574242e319669676e6e07bdc3aef7c3 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 4 Dec 2016 23:41:07 +0900 Subject: [PATCH 2/6] Fix some typos. --- lib/jsonrpc.py | 2 +- server/protocol.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 861b565..d3d7390 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -145,7 +145,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.recv_size += len(data) self.using_bandwidth(len(data)) - # Close abuvsive connections where buffered data exceeds limit + # Close abusive connections where buffered data exceeds limit buffer_size = len(data) + sum(len(part) for part in self.parts) if buffer_size > self.max_buffer_size: self.log_error('read buffer of {:,d} bytes exceeds {:,d} ' diff --git a/server/protocol.py b/server/protocol.py index 8ed4a73..8d348b2 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -552,8 +552,8 @@ class Session(JSONRPC): '''Base class of ElectrumX JSON session protocols. Each session runs its tasks in asynchronous parallelism with other - sessions. To prevent some sessions blocking othersr, potentially - long-running requests should yield (not yet implemented). + sessions. To prevent some sessions blocking others, potentially + long-running requests should yield. ''' def __init__(self, manager, bp, env, kind): From bc9027094b5e803cf895621f39dc1b8bed9a5878 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 5 Dec 2016 06:37:22 +0900 Subject: [PATCH 3/6] Fix environment variable (bauerj) --- electrumx_rpc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 47cb8f6..4535000 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -58,7 +58,7 @@ def main(): args = parser.parse_args() if args.port is None: - args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000)) + args.port = int(environ.get('RPC_PORT', 8000)) loop = asyncio.get_event_loop() coro = loop.create_connection(RPCClient, 'localhost', args.port) From 643c9906846f6b2aab7d1eca3b7aecbf866f4eab Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 5 Dec 2016 23:27:36 +0900 Subject: [PATCH 4/6] Revert "Add feature to rate limit connections" This reverts commit a82ab0b07577c624031e725ce780d4bcb77c30b9. --- docs/ENV-NOTES | 3 --- server/env.py | 2 -- server/protocol.py | 37 +++++++++++-------------------------- 3 files changed, 11 insertions(+), 31 deletions(-) diff --git a/docs/ENV-NOTES b/docs/ENV-NOTES index d16f567..58baf97 100644 --- a/docs/ENV-NOTES +++ b/docs/ENV-NOTES @@ -83,9 +83,6 @@ BANDWIDTH_LIMIT - per-session periodic bandwith usage limit in bytes. end of each period. Currently the period is hard-coded to be one hour. The default limit value is 2 million bytes. -CONN_LIMIT - the number of new incoming connections is limited to -CONN_LIMIT_SECS CONN_LIMIT connections every CONN_LIMIT_SECS. - The default is 10 every 10 seconds. SESSION_TIMEOUT - an integer number of seconds defaulting to 600. Sessions with no activity for longer than this are disconnected. diff --git a/server/env.py b/server/env.py index 5f30ac6..f13c56e 100644 --- a/server/env.py +++ b/server/env.py @@ -50,8 +50,6 @@ class Env(LoggedClass): self.max_subs = self.integer('MAX_SUBS', 250000) self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000) - self.conn_limit = self.integer('CONN_LIMIT', 10) - self.conn_limit_secs = self.integer('CONN_LIMIT_SECS', 10) self.session_timeout = self.integer('SESSION_TIMEOUT', 600) # IRC self.irc = self.default('IRC', False) diff --git a/server/protocol.py b/server/protocol.py index 8d348b2..cbe8985 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -233,15 +233,11 @@ class ServerManager(util.LoggedClass): self.next_stale_check = 0 self.history_cache = pylru.lrucache(512) self.futures = [] - self.cl_kizami = 0 - self.cl_session_id = 0 env.max_send = max(350000, env.max_send) self.logger.info('session timeout: {:,d} seconds' .format(env.session_timeout)) self.logger.info('session bandwidth limit {:,d} bytes' .format(env.bandwidth_limit)) - self.logger.info('new sessions limited to {:,d} every {:,d}s' - .format(env.conn_limit, env.conn_limit_secs)) self.logger.info('max response size {:,d} bytes'.format(env.max_send)) self.logger.info('max subscriptions across all sessions: {:,d}' .format(self.max_subs)) @@ -383,30 +379,18 @@ class ServerManager(util.LoggedClass): .format(len(self.sessions))) def add_session(self, session): - now = time.time() - self.clear_stale_sessions(now) - if now > self.cl_kizami: - self.cl_kizami = now + self.env.conn_limit_secs - self.cl_session_id = session.id_ - count = session.id_ - self.cl_session_id - if count > self.env.conn_limit: - session.log_info('closing connection from {}: {:,d} in last {:,d}s' - .format(session.peername(), count, - self.env.conn_limit_secs)) - session.transport.close() - else: - coro = session.serve_requests() - future = asyncio.ensure_future(coro) - self.sessions[session] = future - session.log_info('connection from {}, {:,d} total' - .format(session.peername(), len(self.sessions))) - # Some connections are acknowledged after the servers are closed - if not self.servers: - self.close_session(session) + self.clear_stale_sessions() + coro = session.serve_requests() + future = asyncio.ensure_future(coro) + self.sessions[session] = future + session.log_info('connection from {}, {:,d} total' + .format(session.peername(), len(self.sessions))) + # Some connections are acknowledged after the servers are closed + if not self.servers: + self.close_session(session) def remove_session(self, session): # It might have been forcefully removed earlier by close_session() - # or never added because of connection rate limiting if session in self.sessions: self.subscription_count -= session.sub_count() future = self.sessions.pop(session) @@ -423,8 +407,9 @@ class ServerManager(util.LoggedClass): session.log_me = not session.log_me return 'log {:d}: {}'.format(session.id_, session.log_me) - def clear_stale_sessions(self, now): + def clear_stale_sessions(self): '''Cut off sessions that haven't done anything for 10 minutes.''' + now = time.time() if now > self.next_stale_check: self.next_stale_check = now + 60 cutoff = now - self.env.session_timeout From 1f1f0f42f8edc0937e0568b97e120993379b80d0 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 6 Dec 2016 06:36:11 +0900 Subject: [PATCH 5/6] Remove invalidated history cache on notifications --- server/protocol.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index cbe8985..70d5014 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -15,7 +15,7 @@ import ssl import time import traceback from collections import defaultdict, namedtuple -from functools import partial, lru_cache +from functools import partial import pylru @@ -231,7 +231,7 @@ class ServerManager(util.LoggedClass): self.max_subs = env.max_subs self.subscription_count = 0 self.next_stale_check = 0 - self.history_cache = pylru.lrucache(512) + self.history_cache = pylru.lrucache(128) self.futures = [] env.max_send = max(350000, env.max_send) self.logger.info('session timeout: {:,d} seconds' @@ -318,6 +318,10 @@ class ServerManager(util.LoggedClass): def notify(self, touched): '''Notify sessions about height changes and touched addresses.''' + # Remove invalidated history cache + hc = self.history_cache + for hash168 in set(hc).intersection(touched): + del hc[hash168] cache = {} for session in self.sessions: if isinstance(session, ElectrumX): From 36a06ea2c2128440e8f1b8bc43f33b17a8149445 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 6 Dec 2016 06:40:19 +0900 Subject: [PATCH 6/6] Prepare 0.8.4 --- README.rst | 6 ++---- RELEASE-NOTES | 5 +++++ server/version.py | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/README.rst b/README.rst index 6b32929..ccd46e0 100644 --- a/README.rst +++ b/README.rst @@ -113,10 +113,8 @@ be necessary. Roadmap Pre-1.0 =============== -- minor code cleanups -- at most 1 more DB format change; I will make a weak attempt to - retain 0.6 release's DB format if possible -- provision of bandwidth limit controls +- minor code cleanups. It is unlikely DB format will change +- better DoS protections and mempool handling - implement simple protocol to discover peers without resorting to IRC diff --git a/RELEASE-NOTES b/RELEASE-NOTES index 6140ddc..78c83b1 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,8 @@ +version 0.8.4 +------------- + +- remove invalidated histories from cache on new block + version 0.8.3 ------------- diff --git a/server/version.py b/server/version.py index 5e72ebe..23e3031 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.8.3" +VERSION = "ElectrumX 0.8.4"