Add per-session bandwidth limits
This commit is contained in:
parent
c9a10be5ba
commit
b65bcda504
@ -79,6 +79,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.start = time.time()
|
self.start = time.time()
|
||||||
|
self.bandwidth_start = self.start
|
||||||
|
self.bandwidth_interval = 3600
|
||||||
|
self.bandwidth_used = 0
|
||||||
|
self.bandwidth_limit = 5000000
|
||||||
self.transport = None
|
self.transport = None
|
||||||
# Parts of an incomplete JSON line. We buffer them until
|
# Parts of an incomplete JSON line. We buffer them until
|
||||||
# getting a newline.
|
# getting a newline.
|
||||||
@ -117,6 +121,13 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
|
|||||||
'''Handle client disconnection.'''
|
'''Handle client disconnection.'''
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def using_bandwidth(self, amount):
|
||||||
|
now = time.time()
|
||||||
|
if now >= self.bandwidth_start + self.bandwidth_interval:
|
||||||
|
self.bandwidth_start = now
|
||||||
|
self.bandwidth_used = 0
|
||||||
|
self.bandwidth_used += amount
|
||||||
|
|
||||||
def data_received(self, data):
|
def data_received(self, data):
|
||||||
'''Handle incoming data (synchronously).
|
'''Handle incoming data (synchronously).
|
||||||
|
|
||||||
@ -124,6 +135,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
|
|||||||
decode_message for handling.
|
decode_message for handling.
|
||||||
'''
|
'''
|
||||||
self.recv_size += len(data)
|
self.recv_size += len(data)
|
||||||
|
self.using_bandwidth(len(data))
|
||||||
|
|
||||||
# Close abuvsive connections where buffered data exceeds limit
|
# Close abuvsive connections where buffered data exceeds limit
|
||||||
buffer_size = len(data) + sum(len(part) for part in self.parts)
|
buffer_size = len(data) + sum(len(part) for part in self.parts)
|
||||||
@ -213,6 +225,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
|
|||||||
else:
|
else:
|
||||||
self.send_count += 1
|
self.send_count += 1
|
||||||
self.send_size += len(data)
|
self.send_size += len(data)
|
||||||
|
self.using_bandwidth(len(data))
|
||||||
self.transport.write(data)
|
self.transport.write(data)
|
||||||
|
|
||||||
async def handle_message(self, message):
|
async def handle_message(self, message):
|
||||||
@ -220,6 +233,17 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
|
|||||||
|
|
||||||
Handles batches according to the JSON 2.0 spec.
|
Handles batches according to the JSON 2.0 spec.
|
||||||
'''
|
'''
|
||||||
|
# Throttle high-bandwidth connections by delaying processing
|
||||||
|
# their requests. Delay more the higher the excessive usage.
|
||||||
|
excess = self.bandwidth_used - self.bandwidth_limit
|
||||||
|
if excess > 0:
|
||||||
|
secs = 1 + excess // self.bandwidth_limit
|
||||||
|
self.logger.warning('{} has high bandwidth use of {:,d} bytes, '
|
||||||
|
'sleeping {:d}s'
|
||||||
|
.format(self.peername(), self.bandwidth_used,
|
||||||
|
secs))
|
||||||
|
await asyncio.sleep(secs)
|
||||||
|
|
||||||
if isinstance(message, list):
|
if isinstance(message, list):
|
||||||
payload = await self.batch_payload(message)
|
payload = await self.batch_payload(message)
|
||||||
else:
|
else:
|
||||||
|
|||||||
@ -49,6 +49,7 @@ class Env(LoggedClass):
|
|||||||
self.max_send = self.integer('MAX_SEND', 1000000)
|
self.max_send = self.integer('MAX_SEND', 1000000)
|
||||||
self.max_subs = self.integer('MAX_SUBS', 250000)
|
self.max_subs = self.integer('MAX_SUBS', 250000)
|
||||||
self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
|
self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
|
||||||
|
self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000)
|
||||||
# IRC
|
# IRC
|
||||||
self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port)
|
self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port)
|
||||||
self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port)
|
self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port)
|
||||||
|
|||||||
@ -230,6 +230,8 @@ class ServerManager(util.LoggedClass):
|
|||||||
self.subscription_count = 0
|
self.subscription_count = 0
|
||||||
self.futures = []
|
self.futures = []
|
||||||
env.max_send = max(350000, env.max_send)
|
env.max_send = max(350000, env.max_send)
|
||||||
|
self.logger.info('session bandwidth limit {:,d} bytes'
|
||||||
|
.format(env.bandwidth_limit))
|
||||||
self.logger.info('max response size {:,d} bytes'.format(env.max_send))
|
self.logger.info('max response size {:,d} bytes'.format(env.max_send))
|
||||||
self.logger.info('max subscriptions across all sessions: {:,d}'
|
self.logger.info('max subscriptions across all sessions: {:,d}'
|
||||||
.format(self.max_subs))
|
.format(self.max_subs))
|
||||||
@ -471,6 +473,7 @@ class Session(JSONRPC):
|
|||||||
self.client = 'unknown'
|
self.client = 'unknown'
|
||||||
self.anon_logs = env.anon_logs
|
self.anon_logs = env.anon_logs
|
||||||
self.max_send = env.max_send
|
self.max_send = env.max_send
|
||||||
|
self.bandwidth_limit = env.bandwidth_limit
|
||||||
self.txs_sent = 0
|
self.txs_sent = 0
|
||||||
|
|
||||||
def connection_made(self, transport):
|
def connection_made(self, transport):
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user