diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 7cc2fbb..deecd1e 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -35,8 +35,7 @@ class RPCClient(JSONRPC): self.max_buffer_size = 5000000 if params: params = [params] - payload = self.request_payload(method, id_=method, params=params) - self.encode_and_send_payload(payload) + self.send_request(method, method, params) future = asyncio.ensure_future(self.queue.get()) for f in asyncio.as_completed([future], timeout=timeout): @@ -80,6 +79,7 @@ def main(): except OSError: print('error connecting - is ElectrumX catching up or not running?') finally: + loop.stop() loop.close() diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 91cb405..7657cfb 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -13,6 +13,7 @@ import json import numbers import time import traceback +from functools import partial from lib.util import LoggedClass @@ -121,7 +122,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): NEXT_SESSION_ID = 0 @classmethod - def request_payload(cls, method, id_, params=None): + def request_payload(cls, id_, method, params=None): payload = {'jsonrpc': '2.0', 'id': id_, 'method': method} if params: payload['params'] = params @@ -131,10 +132,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass): def response_payload(cls, result, id_): return {'jsonrpc': '2.0', 'result': result, 'id': id_} - @classmethod - def notification_payload(cls, method, params=None): - return cls.request_payload(method, None, params) - @classmethod def error_payload(cls, message, code, id_=None): error = {'message': message, 'code': code} @@ -166,6 +163,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): def __init__(self): super().__init__() + self.send_notification = partial(self.send_request, None) self.start = time.time() self.stop = 0 self.last_recv = self.start @@ -316,6 +314,18 @@ class JSONRPC(asyncio.Protocol, LoggedClass): '''Send a JSON error.''' self._send_bytes(self.json_error_bytes(message, code, id_)) + def send_request(self, id_, method, params=None): + '''Send a request. If id_ is None it is a notification.''' + self.encode_and_send_payload(self.request_payload(id_, method, params)) + + def send_notifications(self, mp_iterable): + '''Send an iterable of (method, params) notification pairs. + + A 1-tuple is also valid in which case there are no params.''' + # TODO: maybe send batches if remote side supports it + for pair in mp_iterable: + self.send_notification(*pair) + def encode_payload(self, payload): assert isinstance(payload, dict) @@ -353,10 +363,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass): '''Encode the payload and send it.''' self._send_bytes(self.encode_payload(payload)) - def json_notification_bytes(self, method, params): - '''Return the bytes of a json notification.''' - return self.encode_payload(self.notification_payload(method, params)) - def json_request_bytes(self, method, id_, params=None): '''Return the bytes of a JSON request.''' return self.encode_payload(self.request_payload(method, id_, params)) diff --git a/server/session.py b/server/session.py index 23cf9dc..46ca43e 100644 --- a/server/session.py +++ b/server/session.py @@ -135,32 +135,29 @@ class ElectrumX(Session): Cache is a shared cache for this update. ''' + controller = self.controller + pairs = [] + if height != self.notified_height: self.notified_height = height if self.subscribe_headers: - payload = self.notification_payload( - 'blockchain.headers.subscribe', - (self.controller.electrum_header(height), ), - ) - self.encode_and_send_payload(payload) + args = (controller.electrum_header(height), ) + pairs.append(('blockchain.headers.subscribe', args)) if self.subscribe_height: - payload = self.notification_payload( - 'blockchain.numblocks.subscribe', - (height, ), - ) - self.encode_and_send_payload(payload) + pairs.append(('blockchain.numblocks.subscribe', (height, ))) matches = touched.intersection(self.hashX_subs) for hashX in matches: address = self.hashX_subs[hashX] - status = await self.controller.address_status(hashX) - payload = self.notification_payload( - 'blockchain.address.subscribe', (address, status)) - self.encode_and_send_payload(payload) + status = await controller.address_status(hashX) + pairs.append(('blockchain.address.subscribe', (address, status))) + self.send_notifications(pairs) if matches: - self.log_info('notified of {:,d} addresses'.format(len(matches))) + es = '' if len(matches) == 1 else 'es' + self.log_info('notified of {:,d} address{}' + .format(len(matches), es)) def height(self): '''Return the current flushed database height.'''