Close stale sessions
New envvar SESSION_TIMEOUT A session with no activity is cut off after this time Fixes #56
This commit is contained in:
parent
7ff3d105f9
commit
3d2824218b
@ -83,6 +83,9 @@ BANDWIDTH_LIMIT - per-session periodic bandwith usage limit in bytes.
|
|||||||
end of each period. Currently the period is
|
end of each period. Currently the period is
|
||||||
hard-coded to be one hour. The default limit value
|
hard-coded to be one hour. The default limit value
|
||||||
is 2 million bytes.
|
is 2 million bytes.
|
||||||
|
SESSION_TIMEOUT - an integer number of seconds defaulting to 600.
|
||||||
|
Sessions with no activity for longer than this are
|
||||||
|
disconnected.
|
||||||
|
|
||||||
If you want IRC connectivity to advertise your node:
|
If you want IRC connectivity to advertise your node:
|
||||||
|
|
||||||
|
|||||||
@ -79,6 +79,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.start = time.time()
|
self.start = time.time()
|
||||||
|
self.last_recv = self.start
|
||||||
self.bandwidth_start = self.start
|
self.bandwidth_start = self.start
|
||||||
self.bandwidth_interval = 3600
|
self.bandwidth_interval = 3600
|
||||||
self.bandwidth_used = 0
|
self.bandwidth_used = 0
|
||||||
@ -155,6 +156,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
|
|||||||
if npos == -1:
|
if npos == -1:
|
||||||
self.parts.append(data)
|
self.parts.append(data)
|
||||||
break
|
break
|
||||||
|
self.last_recv = time.time()
|
||||||
self.recv_count += 1
|
self.recv_count += 1
|
||||||
tail, data = data[:npos], data[npos + 1:]
|
tail, data = data[:npos], data[npos + 1:]
|
||||||
parts, self.parts = self.parts, []
|
parts, self.parts = self.parts, []
|
||||||
|
|||||||
@ -50,6 +50,7 @@ class Env(LoggedClass):
|
|||||||
self.max_subs = self.integer('MAX_SUBS', 250000)
|
self.max_subs = self.integer('MAX_SUBS', 250000)
|
||||||
self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
|
self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
|
||||||
self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000)
|
self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000)
|
||||||
|
self.session_timeout = self.integer('SESSION_TIMEOUT', 600)
|
||||||
# IRC
|
# IRC
|
||||||
self.irc = self.default('IRC', False)
|
self.irc = self.default('IRC', False)
|
||||||
self.irc_nick = self.default('IRC_NICK', None)
|
self.irc_nick = self.default('IRC_NICK', None)
|
||||||
|
|||||||
@ -228,8 +228,11 @@ class ServerManager(util.LoggedClass):
|
|||||||
self.next_log_sessions = 0
|
self.next_log_sessions = 0
|
||||||
self.max_subs = env.max_subs
|
self.max_subs = env.max_subs
|
||||||
self.subscription_count = 0
|
self.subscription_count = 0
|
||||||
|
self.next_stale_check = 0
|
||||||
self.futures = []
|
self.futures = []
|
||||||
env.max_send = max(350000, env.max_send)
|
env.max_send = max(350000, env.max_send)
|
||||||
|
self.logger.info('session timeout: {:,d} seconds'
|
||||||
|
.format(env.session_timeout))
|
||||||
self.logger.info('session bandwidth limit {:,d} bytes'
|
self.logger.info('session bandwidth limit {:,d} bytes'
|
||||||
.format(env.bandwidth_limit))
|
.format(env.bandwidth_limit))
|
||||||
self.logger.info('max response size {:,d} bytes'.format(env.max_send))
|
self.logger.info('max response size {:,d} bytes'.format(env.max_send))
|
||||||
@ -354,6 +357,7 @@ class ServerManager(util.LoggedClass):
|
|||||||
.format(len(self.sessions)))
|
.format(len(self.sessions)))
|
||||||
|
|
||||||
def add_session(self, session):
|
def add_session(self, session):
|
||||||
|
self.clear_stale_sessions()
|
||||||
coro = session.serve_requests()
|
coro = session.serve_requests()
|
||||||
future = asyncio.ensure_future(coro)
|
future = asyncio.ensure_future(coro)
|
||||||
self.sessions[session] = future
|
self.sessions[session] = future
|
||||||
@ -373,6 +377,20 @@ class ServerManager(util.LoggedClass):
|
|||||||
future = self.sessions.pop(session)
|
future = self.sessions.pop(session)
|
||||||
future.cancel()
|
future.cancel()
|
||||||
|
|
||||||
|
def clear_stale_sessions(self):
|
||||||
|
'''Cut off sessions that haven't done anything for 10 minutes.'''
|
||||||
|
now = time.time()
|
||||||
|
if now > self.next_stale_check:
|
||||||
|
self.next_stale_check = now + 60
|
||||||
|
cutoff = now - self.env.session_timeout
|
||||||
|
stale = [session for session in self.sessions
|
||||||
|
if session.last_recv < cutoff]
|
||||||
|
for session in stale:
|
||||||
|
self.close_session(session)
|
||||||
|
if stale:
|
||||||
|
self.logger.info('dropped {:,d} stale connections'
|
||||||
|
.format(len(stale)))
|
||||||
|
|
||||||
def new_subscription(self):
|
def new_subscription(self):
|
||||||
if self.subscription_count >= self.max_subs:
|
if self.subscription_count >= self.max_subs:
|
||||||
raise JSONRPC.RPCError('server subscription limit {:,d} reached'
|
raise JSONRPC.RPCError('server subscription limit {:,d} reached'
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user