diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 7773c96..1ed9f8d 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -67,8 +67,8 @@ class JSONRPC(asyncio.Protocol, LoggedClass): Assumes JSON messages are newline-separated and that newlines cannot appear in the JSON other than to separate lines. Incoming - messages are queued on the messages queue for later asynchronous - processing, and should be passed to the handle_request() function. + requests are passed to enqueue_request(), which should arrange for + their asynchronous handling via the request's process() method. Derived classes may want to override connection_made() and connection_lost() but should be sure to call the implementation in @@ -145,7 +145,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.send_size = 0 self.error_count = 0 self.peer_info = None - self.messages = asyncio.Queue() # Sends longer than max_send are prevented, instead returning # an oversized request error to other end of the network # connection. The request causing it is logged. Values under @@ -408,7 +407,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): # --- derived classes are intended to override these functions def enqueue_request(self, request): '''Enqueue a request for later asynchronous processing.''' - self.messages.put_nowait(request) + raise NotImplementedError async def handle_notification(self, method, params): '''Handle a notification.''' diff --git a/server/protocol.py b/server/protocol.py index 07a6533..740590e 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -14,6 +14,7 @@ import json import ssl import time import traceback +from bisect import bisect_left from collections import defaultdict, namedtuple from functools import partial @@ -217,6 +218,7 @@ class ServerManager(util.LoggedClass): ''' MgrTask = namedtuple('MgrTask', 'session task') + N = 5 class NotificationRequest(object): def __init__(self, fn_call): @@ -231,6 +233,7 @@ class ServerManager(util.LoggedClass): self.env = env self.servers = [] self.sessions = {} + self.groups = defaultdict(set) self.txs_sent = 0 self.next_log_sessions = 0 self.max_subs = env.max_subs @@ -238,9 +241,13 @@ class ServerManager(util.LoggedClass): self.next_stale_check = 0 self.history_cache = pylru.lrucache(256) self.header_cache = pylru.lrucache(8) + self.queue = asyncio.PriorityQueue() + self.delayed_queue = [] + self.next_request_id = 0 self.height = 0 self.futures = [] env.max_send = max(350000, env.max_send) + self.setup_bands() self.logger.info('session timeout: {:,d} seconds' .format(env.session_timeout)) self.logger.info('session bandwidth limit {:,d} bytes' @@ -266,6 +273,50 @@ class ServerManager(util.LoggedClass): ''' return self.mempool.value(hash168) + async def serve_requests(self): + '''Asynchronously run through the task queue.''' + while True: + priority_, id_, request = await self.queue.get() + try: + await request.process() + except asyncio.CancelledError: + break + except Exception: + # Getting here should probably be considered a bug and fixed + self.log_error('error handling request {}'.format(request)) + traceback.print_exc() + + def setup_bands(self): + bands = [] + limit = env.bandwidth_limit + for n in range(self.N): + bands.append(limit) + limit //= 4 + limit = env.bandwidth_limit + for n in range(self.N): + limit += limit // 2 + bands.append(limit) + self.bands = sorted(bands) + self.logger.info('bands: {}'.format(self.bands)) + + def session_priority(self, session): + if isinstance(session, LocalRPC): + return 0 + group_bandwidth = sum(s.bandwidth_used for s in self.sessions[session]) + return (bisect_left(self.bands, session.bandwidth_used) + + bisect_left(self.bands, group_bandwidth) + 1) // 2 + + def enqueue_request(self, session, request): + priority = self.session_priority(session) + item = (priority, self.next_request_id, request) + self.next_request_id += 1 + + secs = priority - self.N + if secs >= 0: + self.delayed_queue.append((time.time() + secs, item)) + else: + self.queue.put_nowait(item) + async def main_loop(self): '''Server manager main loop.''' def add_future(coro): @@ -277,6 +328,8 @@ class ServerManager(util.LoggedClass): add_future(self.mempool.main_loop(self.bp.event)) add_future(self.irc.start(self.bp.event)) add_future(self.start_servers(self.bp.event)) + for n in range(4): + add_future(self.serve_requests()) for future in asyncio.as_completed(self.futures): try: @@ -403,27 +456,24 @@ class ServerManager(util.LoggedClass): .format(len(self.sessions))) def add_session(self, session): - self.clear_stale_sessions() - coro = session.serve_requests() - future = asyncio.ensure_future(coro) - self.sessions[session] = future - session.log_info('connection from {}, {:,d} total' - .format(session.peername(), len(self.sessions))) # Some connections are acknowledged after the servers are closed if not self.servers: - self.close_session(session) + return + self.clear_stale_sessions() + group = self.groups[int(self.start - self.manager.start) // 60] + group.add(session) + self.sessions[session] = group + session.log_info('connection from {}, {:,d} total' + .format(session.peername(), len(self.sessions))) def remove_session(self, session): - # It might have been forcefully removed earlier by close_session() - if session in self.sessions: - self.subscription_count -= session.sub_count() - future = self.sessions.pop(session) - future.cancel() + group = self.sessions.pop(session) + group.remove(session) + self.subscription_count -= session.sub_count() def close_session(self, session): '''Close the session's transport and cancel its future.''' session.transport.close() - self.sessions[session].cancel() return 'disconnected {:d}'.format(session.id_) def toggle_logging(self, session): @@ -436,6 +486,9 @@ class ServerManager(util.LoggedClass): now = time.time() if now > self.next_stale_check: self.next_stale_check = now + 60 + # Clear out empty groups + for key in [k for k, v in self.groups.items() if not v]: + del self.groups[k] cutoff = now - self.env.session_timeout stale = [session for session in self.sessions if session.last_recv < cutoff @@ -465,6 +518,7 @@ class ServerManager(util.LoggedClass): 'blocks': self.bp.db_height, 'closing': len([s for s in self.sessions if s.is_closing()]), 'errors': sum(s.error_count for s in self.sessions), + 'logged': len([s for s in self.sessions if s.log_me]), 'peers': len(self.irc.peers), 'sessions': self.session_count(), 'txs_sent': self.txs_sent, @@ -482,9 +536,9 @@ class ServerManager(util.LoggedClass): return ('{:3d}:{:02d}:{:02d}' .format(t // 3600, (t % 3600) // 60, t % 60)) - fmt = ('{:<6} {:<3} {:>23} {:>15} {:>7} ' + fmt = ('{:<6} {:<5} {:>23} {:>15} {:>7} ' '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') - yield fmt.format('ID', 'Flg', 'Peer', 'Client', 'Subs', + yield fmt.format('ID', 'Flags', 'Peer', 'Client', 'Subs', 'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Txs', 'Time') for (id_, flags, peer, subs, client, recv_count, recv_size, @@ -501,7 +555,7 @@ class ServerManager(util.LoggedClass): def session_data(self, for_log): '''Returned to the RPC 'sessions' call.''' now = time.time() - sessions = sorted(self.sessions.keys(), key=lambda s: s.start) + sessions = sorted(self.sessions, key=lambda s: s.start) return [(session.id_, session.flags(), session.peername(for_log=for_log), @@ -577,7 +631,7 @@ class Session(JSONRPC): self.max_send = env.max_send self.bandwidth_limit = env.bandwidth_limit self.txs_sent = 0 - self.bucket = int(self.start - self.manager.start) // 60 + self.priority = 1 def is_closing(self): '''True if this session is closing.''' @@ -592,6 +646,9 @@ class Session(JSONRPC): status += 'L' return status + def enqueue_request(self, request): + self.manager.enqueue_request(self, request) + def connection_made(self, transport): '''Handle an incoming client connection.''' super().connection_made(transport) @@ -615,19 +672,6 @@ class Session(JSONRPC): return await handler(params) - async def serve_requests(self): - '''Asynchronously run through the task queue.''' - while True: - request = await self.messages.get() - try: - await request.process() - except asyncio.CancelledError: - break - except Exception: - # Getting here should probably be considered a bug and fixed - self.log_error('error handling request {}'.format(request)) - traceback.print_exc() - def sub_count(self): return 0