diff --git a/server/protocol.py b/server/protocol.py index 6eda042..7a77b1e 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -108,9 +108,11 @@ class ServerManager(LoggedClass): def notify(self, height, touched): '''Notify sessions about height changes and touched addresses.''' - sessions = [session for session in self.sessions - if isinstance(session, ElectrumX)] - ElectrumX.notify(sessions, height, touched) + cache = {} + for session in self.sessions: + if isinstance(session, ElectrumX): + # Use a tuple to distinguish from JSON + session.jobs.put_nowait((height, touched, cache)) def stop(self): '''Close listening servers.''' @@ -196,7 +198,7 @@ class Session(JSONRPC): self.coin = bp.coin self.kind = kind self.hash168s = set() - self.requests = asyncio.Queue() + self.jobs = asyncio.Queue() self.current_task = None self.client = 'unknown' @@ -222,26 +224,23 @@ class Session(JSONRPC): def on_json_request(self, request): '''Queue the request for asynchronous handling.''' - self.requests.put_nowait(request) + self.jobs.put_nowait(request) async def serve_requests(self): '''Asynchronously run through the task queue.''' while True: await asyncio.sleep(0) - request = await self.requests.get() + job = await self.jobs.get() try: - start = time.time() - await self.handle_json_request(request) - secs = time.time() - start - if secs > 1: - self.logger.warning('slow request for {} took {:.1f}s: {}' - .format(self.peername(), secs, - request)) + if isinstance(job, tuple): # Height / mempool notification + await self.notify(*job) + else: + await self.handle_json_request(job) except asyncio.CancelledError: break except Exception: # Getting here should probably be considered a bug and fixed - self.logger.error('error handling request {}'.format(request)) + self.logger.error('error handling request {}'.format(job)) traceback.print_exc() def peername(self, *, for_log=True): @@ -324,36 +323,41 @@ class ElectrumX(Session): for prefix, suffixes in rpcs for suffix in suffixes.split()} - @classmethod - def notify(cls, sessions, height, touched): - headers_payload = height_payload = None + async def notify(self, height, touched, cache): + '''Notify the client about changes in height and touched addresses. - for session in sessions: - if height != session.notified_height: - session.notified_height = height - if session.subscribe_headers: - if headers_payload is None: - headers_payload = json_notification_payload( - 'blockchain.headers.subscribe', - (session.electrum_header(height), ), - ) - session.send_json(headers_payload) + Cache is a shared cache for this update. + ''' + if height != self.notified_height: + self.notified_height = height + if self.subscribe_headers: + key = 'headers_payload' + if key not in cache: + cache[key] = json_notification_payload( + 'blockchain.headers.subscribe', + (self.electrum_header(height), ), + ) + self.send_json(cache[key]) - if session.subscribe_height: - if height_payload is None: - height_payload = json_notification_payload( - 'blockchain.numblocks.subscribe', - (height, ), - ) - session.send_json(height_payload) - - hash168_to_address = session.coin.hash168_to_address - for hash168 in session.hash168s.intersection(touched): - address = hash168_to_address(hash168) - status = session.address_status(hash168) + if self.subscribe_height: payload = json_notification_payload( - 'blockchain.address.subscribe', (address, status)) - session.send_json(payload) + 'blockchain.numblocks.subscribe', + (height, ), + ) + self.send_json(payload) + + hash168_to_address = self.coin.hash168_to_address + matches = self.hash168s.intersection(touched) + for hash168 in matches: + address = hash168_to_address(hash168) + status = self.address_status(hash168) + payload = json_notification_payload( + 'blockchain.address.subscribe', (address, status)) + self.send_json(payload) + + if matches: + self.logger.info('notified {} of {} addresses' + .format(self.peername(), len(matches))) def height(self): '''Return the block processor's current height.'''