From cbb1e504cc37de69096dad5039c2b89bb762cf34 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 8 Dec 2016 06:19:11 +0900 Subject: [PATCH] Cache headers. --- lib/jsonrpc.py | 94 ++++++++++++++++++++++++---------------------- server/protocol.py | 60 ++++++++++++++++------------- 2 files changed, 83 insertions(+), 71 deletions(-) diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 0496dee..7773c96 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -15,6 +15,53 @@ import time from lib.util import LoggedClass +class SingleRequest(object): + '''An object that represents a single request.''' + def __init__(self, session, payload): + self.payload = payload + self.session = session + + async def process(self): + '''Asynchronously handle the JSON request.''' + binary = await self.session.process_single_payload(self.payload) + if binary: + self.session._send_bytes(binary) + + +class BatchRequest(object): + '''An object that represents a batch request and its processing state. + + Batches are processed in parts chunks. + ''' + + CUHNK_SIZE = 3 + + def __init__(self, session, payload): + self.session = session + self.payload = payload + self.done = 0 + self.parts = [] + + async def process(self): + '''Asynchronously handle the JSON batch according to the JSON 2.0 + spec.''' + for n in range(self.CHUNK_SIZE): + if self.done >= len(self.payload): + if self.parts: + binary = b'[' + b', '.join(self.parts) + b']' + self.session._send_bytes(binary) + return + item = self.payload[self.done] + part = await self.session.process_single_payload(item) + if part: + self.parts.append(part) + self.done += 1 + + # Re-enqueue to continue the rest later + self.session.enqueue_request(self) + return b'' + + class JSONRPC(asyncio.Protocol, LoggedClass): '''Manages a JSONRPC connection. @@ -53,48 +100,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.msg = msg self.code = code - class SingleRequest(object): - '''An object that represents a single request.''' - def __init__(self, session, payload): - self.payload = payload - self.session = session - - async def process(self): - '''Asynchronously handle the JSON request.''' - binary = await self.session.process_single_payload(self.payload) - if binary: - self.session._send_bytes(binary) - - class BatchRequest(object): - '''An object that represents a batch request and its processing - state.''' - def __init__(self, session, payload): - self.session = session - self.payload = payload - self.done = 0 - self.parts = [] - - async def process(self): - '''Asynchronously handle the JSON batch according to the JSON 2.0 - spec.''' - if not self.payload: - raise JSONRPC.RPCError('empty batch', self.INVALID_REQUEST) - for n in range(self.session.batch_limit): - if self.done >= len(self.payload): - if self.parts: - binary = b'[' + b', '.join(self.parts) + b']' - self.session._send_bytes(binary) - return - item = self.payload[self.done] - part = await self.session.process_single_payload(item) - if part: - self.parts.append(part) - self.done += 1 - - # Re-enqueue to continue the rest later - self.session.enqueue_request(self) - return b'' - @classmethod def request_payload(cls, method, id_, params=None): payload = {'jsonrpc': '2.0', 'id': id_, 'method': method} @@ -129,7 +134,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.bandwidth_interval = 3600 self.bandwidth_used = 0 self.bandwidth_limit = 5000000 - self.batch_limit = 4 self.transport = None # Parts of an incomplete JSON line. We buffer them until # getting a newline. @@ -239,9 +243,9 @@ class JSONRPC(asyncio.Protocol, LoggedClass): if not message: self.send_json_error('empty batch', self.INVALID_REQUEST) return - request = self.BatchRequest(self, message) + request = BatchRequest(self, message) else: - request = self.SingleRequest(self, message) + request = SingleRequest(self, message) '''Queue the request for asynchronous handling.''' self.enqueue_request(request) diff --git a/server/protocol.py b/server/protocol.py index 2711ee9..07a6533 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -218,8 +218,13 @@ class ServerManager(util.LoggedClass): MgrTask = namedtuple('MgrTask', 'session task') + class NotificationRequest(object): + def __init__(self, fn_call): + self.process = fn_call + def __init__(self, env): super().__init__() + self.start = time.time() self.bp = BlockProcessor(self, env) self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, self) self.irc = IRC(env) @@ -231,7 +236,9 @@ class ServerManager(util.LoggedClass): self.max_subs = env.max_subs self.subscription_count = 0 self.next_stale_check = 0 - self.history_cache = pylru.lrucache(128) + self.history_cache = pylru.lrucache(256) + self.header_cache = pylru.lrucache(8) + self.height = 0 self.futures = [] env.max_send = max(350000, env.max_send) self.logger.info('session timeout: {:,d} seconds' @@ -316,21 +323,19 @@ class ServerManager(util.LoggedClass): sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc) - class NotificationRequest(object): - def __init__(self, fn_call): - self.process = fn_call - def notify(self, touched): '''Notify sessions about height changes and touched addresses.''' - # Remove invalidated history cache + # Invalidate caches hc = self.history_cache for hash168 in set(hc).intersection(touched): del hc[hash168] - cache = {} + if self.bp.db_height != self.height: + self.height = self.bp.db_height + self.header_cache.clear() + for session in self.sessions: if isinstance(session, ElectrumX): - fn_call = partial(session.notify, self.bp.db_height, touched, - cache) + fn_call = partial(session.notify, self.bp.db_height, touched) session.enqueue_request(self.NotificationRequest(fn_call)) # Periodically log sessions if self.env.log_sessions and time.time() > self.next_log_sessions: @@ -340,6 +345,17 @@ class ServerManager(util.LoggedClass): self.logger.info(json.dumps(self.server_summary())) self.next_log_sessions = time.time() + self.env.log_sessions + def electrum_header(self, height): + '''Return the binary header at the given height.''' + if not 0 <= height <= self.bp.db_height: + raise self.RPCError('height {:,d} out of range'.format(height)) + if height in self.header_cache: + return self.header_cache[height] + header = self.bp.read_headers(height, 1) + header = self.env.coin.electrum_header(header, height) + self.header_cache[height] = header + return header + async def async_get_history(self, hash168): if hash168 in self.history_cache: return self.history_cache[hash168] @@ -561,6 +577,7 @@ class Session(JSONRPC): self.max_send = env.max_send self.bandwidth_limit = env.bandwidth_limit self.txs_sent = 0 + self.bucket = int(self.start - self.manager.start) // 60 def is_closing(self): '''True if this session is closing.''' @@ -676,7 +693,7 @@ class ElectrumX(Session): def sub_count(self): return len(self.hash168s) - async def notify(self, height, touched, cache): + async def notify(self, height, touched): '''Notify the client about changes in height and touched addresses. Cache is a shared cache for this update. @@ -684,13 +701,11 @@ class ElectrumX(Session): if height != self.notified_height: self.notified_height = height if self.subscribe_headers: - key = 'headers_payload' - if key not in cache: - cache[key] = self.notification_payload( - 'blockchain.headers.subscribe', - (self.electrum_header(height), ), - ) - self.encode_and_send_payload(cache[key]) + payload = self.notification_payload( + 'blockchain.headers.subscribe', + (self.manager.electrum_header(height), ), + ) + self.encode_and_send_payload(payload) if self.subscribe_height: payload = self.notification_payload( @@ -717,14 +732,7 @@ class ElectrumX(Session): def current_electrum_header(self): '''Used as response to a headers subscription request.''' - return self.electrum_header(self.height()) - - def electrum_header(self, height): - '''Return the binary header at the given height.''' - if not 0 <= height <= self.height(): - raise self.RPCError('height {:,d} out of range'.format(height)) - header = self.bp.read_headers(height, 1) - return self.coin.electrum_header(header, height) + return self.manager.electrum_header(self.height()) async def address_status(self, hash168): '''Returns status as 32 bytes.''' @@ -848,7 +856,7 @@ class ElectrumX(Session): async def block_get_header(self, params): height = self.params_to_non_negative_integer(params) - return self.electrum_header(height) + return self.manager.electrum_header(height) async def estimatefee(self, params): return await self.daemon_request('estimatefee', params)