Rewrite lib/jsonrpc.py to be a useful client too
Use for an improved electrumx_rpc.py, so it now handles responses spread over several packets. Also added a timeout of 5 seconds. Fixes #43
This commit is contained in:
parent
dcf4d237a2
commit
42c00d32d2
@ -16,24 +16,30 @@ import json
|
||||
from functools import partial
|
||||
from os import environ
|
||||
|
||||
from lib.jsonrpc import JSONRPC
|
||||
|
||||
class RPCClient(asyncio.Protocol):
|
||||
|
||||
def __init__(self, loop):
|
||||
self.loop = loop
|
||||
self.method = None
|
||||
class RPCClient(JSONRPC):
|
||||
|
||||
def connection_made(self, transport):
|
||||
self.transport = transport
|
||||
async def send_and_wait(self, method, params, timeout=None):
|
||||
self.send_json_request(method, id_=method, params=params)
|
||||
|
||||
def connection_lost(self, exc):
|
||||
self.loop.stop()
|
||||
future = asyncio.ensure_future(self.messages.get())
|
||||
for f in asyncio.as_completed([future], timeout=timeout):
|
||||
try:
|
||||
message = await f
|
||||
except asyncio.TimeoutError:
|
||||
future.cancel()
|
||||
print('request timed out')
|
||||
else:
|
||||
await self.handle_message(message)
|
||||
|
||||
def send(self, method, params):
|
||||
self.method = method
|
||||
payload = {'method': method, 'params': params, 'id': 'RPC'}
|
||||
data = json.dumps(payload) + '\n'
|
||||
self.transport.write(data.encode())
|
||||
async def handle_response(self, result, error, method):
|
||||
if result and method == 'sessions':
|
||||
self.print_sessions(result)
|
||||
else:
|
||||
value = {'error': error} if error else result
|
||||
print(json.dumps(value, indent=4, sort_keys=True))
|
||||
|
||||
def print_sessions(self, result):
|
||||
def data_fmt(count, size):
|
||||
@ -58,17 +64,6 @@ class RPCClient(asyncio.Protocol):
|
||||
'{:,d}'.format(error_count),
|
||||
time_fmt(time)))
|
||||
|
||||
def data_received(self, data):
|
||||
payload = json.loads(data.decode())
|
||||
self.transport.close()
|
||||
result = payload['result']
|
||||
error = payload['error']
|
||||
if not error and self.method == 'sessions':
|
||||
self.print_sessions(result)
|
||||
else:
|
||||
value = {'error': error} if error else result
|
||||
print(json.dumps(value, indent=4, sort_keys=True))
|
||||
|
||||
def main():
|
||||
'''Send the RPC command to the server and print the result.'''
|
||||
parser = argparse.ArgumentParser('Send electrumx an RPC command' )
|
||||
@ -84,12 +79,11 @@ def main():
|
||||
args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000))
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
proto_factory = partial(RPCClient, loop)
|
||||
coro = loop.create_connection(proto_factory, 'localhost', args.port)
|
||||
coro = loop.create_connection(RPCClient, 'localhost', args.port)
|
||||
try:
|
||||
transport, protocol = loop.run_until_complete(coro)
|
||||
protocol.send(args.command[0], args.param)
|
||||
loop.run_forever()
|
||||
coro = protocol.send_and_wait(args.command[0], args.param, timeout=5)
|
||||
loop.run_until_complete(coro)
|
||||
except OSError:
|
||||
print('error connecting - is ElectrumX catching up or not running?')
|
||||
finally:
|
||||
|
||||
162
lib/jsonrpc.py
162
lib/jsonrpc.py
@ -15,38 +15,45 @@ import time
|
||||
from lib.util import LoggedClass
|
||||
|
||||
|
||||
def json_result_payload(result, id_):
|
||||
def json_response_payload(result, id_):
|
||||
# We should not respond to notifications
|
||||
assert id_ is not None
|
||||
return {'jsonrpc': '2.0', 'error': None, 'result': result, 'id': id_}
|
||||
return {'jsonrpc': '2.0', 'result': result, 'id': id_}
|
||||
|
||||
def json_error_payload(message, code, id_=None):
|
||||
error = {'message': message, 'code': code}
|
||||
return {'jsonrpc': '2.0', 'error': error, 'result': None, 'id': id_}
|
||||
return {'jsonrpc': '2.0', 'error': error, 'id': id_}
|
||||
|
||||
def json_notification_payload(method, params):
|
||||
return {'jsonrpc': '2.0', 'id': None, 'method': method, 'params': params}
|
||||
def json_request_payload(method, id_, params=None):
|
||||
payload = {'jsonrpc': '2.0', 'id': id_, 'method': method}
|
||||
if params:
|
||||
payload['params'] = params
|
||||
return payload
|
||||
|
||||
def json_notification_payload(method, params=None):
|
||||
return json_request_payload(method, None, params)
|
||||
|
||||
|
||||
class JSONRPC(asyncio.Protocol, LoggedClass):
|
||||
'''Manages a JSONRPC connection.
|
||||
|
||||
Assumes JSON messages are newline-separated and that newlines
|
||||
cannot appear in the JSON other than to separate lines.
|
||||
cannot appear in the JSON other than to separate lines. Incoming
|
||||
messages are queued on the messages queue for later asynchronous
|
||||
processing, and should be passed to the handle_message() function.
|
||||
|
||||
Derived classes need to implement the synchronous functions
|
||||
on_json_request() and method_handler(). They probably also want
|
||||
to override connection_made() and connection_lost() but should be
|
||||
sure to call the implementation in this base class first.
|
||||
Derived classes may want to override connection_made() and
|
||||
connection_lost() but should be sure to call the implementation in
|
||||
this base class first. They will also want to implement some or
|
||||
all of the asynchronous functions handle_notification(),
|
||||
handle_response() and handle_request().
|
||||
|
||||
on_json_request() is passed a JSON request as a python object
|
||||
after decoding. It should arrange to pass on to the asynchronous
|
||||
handle_json_request() method.
|
||||
handle_request() returns the result to pass over the network, and
|
||||
must raise an RPCError if there is an error.
|
||||
handle_notification() and handle_response() should not return
|
||||
anything or raise any exceptions. All three functions have
|
||||
default "ignore" implementations supplied by this class.
|
||||
|
||||
method_handler() takes a method string and should return a function
|
||||
that can be passed a parameters array, or None for an unknown method.
|
||||
|
||||
Handlers should raise an RPCError on error.
|
||||
'''
|
||||
|
||||
# See http://www.jsonrpc.org/specification
|
||||
@ -54,7 +61,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
|
||||
INVALID_REQUEST = -32600
|
||||
METHOD_NOT_FOUND = -32601
|
||||
INVALID_PARAMS = -32602
|
||||
INTERAL_ERROR = -32603
|
||||
INTERNAL_ERROR = -32603
|
||||
|
||||
ID_TYPES = (type(None), str, numbers.Number)
|
||||
|
||||
@ -65,7 +72,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
|
||||
self.msg = msg
|
||||
self.code = code
|
||||
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.start = time.time()
|
||||
@ -80,6 +86,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
|
||||
self.send_size = 0
|
||||
self.error_count = 0
|
||||
self.peer_info = None
|
||||
self.messages = asyncio.Queue()
|
||||
|
||||
def connection_made(self, transport):
|
||||
'''Handle an incoming client connection.'''
|
||||
@ -132,15 +139,20 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
|
||||
self.transport.close()
|
||||
return
|
||||
|
||||
self.on_json_request(message)
|
||||
'''Queue the request for asynchronous handling.'''
|
||||
self.messages.put_nowait(message)
|
||||
|
||||
def send_json_notification(self, method, params):
|
||||
'''Create a json notification.'''
|
||||
self.send_json(json_notification_payload(method, params))
|
||||
|
||||
def send_json_result(self, result, id_):
|
||||
def send_json_request(self, method, id_, params=None):
|
||||
'''Send a JSON request.'''
|
||||
self.send_json(json_request_payload(method, id_, params))
|
||||
|
||||
def send_json_response(self, result, id_):
|
||||
'''Send a JSON result.'''
|
||||
self.send_json(json_result_payload(result, id_))
|
||||
self.send_json(json_response_payload(result, id_))
|
||||
|
||||
def send_json_error(self, message, code, id_=None):
|
||||
'''Send a JSON error.'''
|
||||
@ -167,20 +179,20 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
|
||||
self.send_size += len(data)
|
||||
self.transport.write(data)
|
||||
|
||||
async def handle_json_request(self, request):
|
||||
'''Asynchronously handle a JSON request.
|
||||
async def handle_message(self, message):
|
||||
'''Asynchronously handle a JSON request or response.
|
||||
|
||||
Handles batch requests.
|
||||
Handles batches according to the JSON 2.0 spec.
|
||||
'''
|
||||
if isinstance(request, list):
|
||||
payload = await self.batch_request_payload(request)
|
||||
if isinstance(message, list):
|
||||
payload = await self.batch_payload(message)
|
||||
else:
|
||||
payload = await self.single_request_payload(request)
|
||||
payload = await self.single_payload(message)
|
||||
|
||||
if payload:
|
||||
self.send_json(payload)
|
||||
|
||||
async def batch_request_payload(self, batch):
|
||||
async def batch_payload(self, batch):
|
||||
'''Return the JSON payload corresponding to a batch JSON request.'''
|
||||
# Batches must have at least one request.
|
||||
if not batch:
|
||||
@ -189,38 +201,39 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
|
||||
|
||||
# PYTHON 3.6: use asynchronous comprehensions when supported
|
||||
payload = []
|
||||
for item in request:
|
||||
item_payload = await self.single_request_payload(item)
|
||||
if item_payload:
|
||||
payload.append(item_payload)
|
||||
for message in batch:
|
||||
message_payload = await self.single_payload(message)
|
||||
if message_payload:
|
||||
payload.append(message_payload)
|
||||
return payload
|
||||
|
||||
async def single_request_payload(self, request):
|
||||
'''Return the JSON payload corresponding to a single JSON request.
|
||||
async def single_payload(self, message):
|
||||
'''Return the JSON payload corresponding to a single JSON request,
|
||||
response or notification.
|
||||
|
||||
Return None if the request is a notification.
|
||||
Return None if the request is a notification or a response.
|
||||
'''
|
||||
if not isinstance(request, dict):
|
||||
if not isinstance(message, dict):
|
||||
return json_error_payload('request must be a dict',
|
||||
self.INVALID_REQUEST)
|
||||
|
||||
id_ = request.get('id')
|
||||
if not 'id' in message:
|
||||
return await self.json_notification(message)
|
||||
|
||||
id_ = message['id']
|
||||
if not isinstance(id_, self.ID_TYPES):
|
||||
return json_error_payload('invalid id: {}'.format(id_),
|
||||
self.INVALID_REQUEST)
|
||||
|
||||
try:
|
||||
result = await self.method_result(request.get('method'),
|
||||
request.get('params', []))
|
||||
if id_ is None:
|
||||
return None
|
||||
return json_result_payload(result, id_)
|
||||
except self.RPCError as e:
|
||||
if id_ is None:
|
||||
return None
|
||||
return json_error_payload(e.msg, e.code, id_)
|
||||
if 'method' in message:
|
||||
return await self.json_request(message)
|
||||
|
||||
return await self.json_response(message)
|
||||
|
||||
def method_and_params(self, message):
|
||||
method = message.get('method')
|
||||
params = message.get('params', [])
|
||||
|
||||
async def method_result(self, method, params):
|
||||
if not isinstance(method, str):
|
||||
raise self.RPCError('invalid method: {}'.format(method),
|
||||
self.INVALID_REQUEST)
|
||||
@ -229,17 +242,46 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
|
||||
raise self.RPCError('params should be an array',
|
||||
self.INVALID_REQUEST)
|
||||
|
||||
handler = self.method_handler(method)
|
||||
if not handler:
|
||||
raise self.RPCError('unknown method: "{}"'.format(method),
|
||||
self.METHOD_NOT_FOUND)
|
||||
return method, params
|
||||
|
||||
return await handler(params)
|
||||
async def json_notification(self, message):
|
||||
try:
|
||||
method, params = self.method_and_params(message)
|
||||
except RCPError:
|
||||
pass
|
||||
else:
|
||||
self.handle_notification(method, params)
|
||||
return None
|
||||
|
||||
def on_json_request(self, request):
|
||||
raise NotImplementedError('on_json_request in class {}'.
|
||||
format(self.__class__.__name__))
|
||||
async def json_request(self, message):
|
||||
try:
|
||||
method, params = self.method_and_params(message)
|
||||
result = await self.handle_request(method, params)
|
||||
return json_response_payload(result, message['id'])
|
||||
except self.RPCError as e:
|
||||
return json_error_payload(e.msg, e.code, message['id'])
|
||||
|
||||
def method_handler(self, method):
|
||||
raise NotImplementedError('method_handler in class {}'.
|
||||
format(self.__class__.__name__))
|
||||
async def json_response(self, message):
|
||||
# Only one of result and error should exist; we go with 'error'
|
||||
# if both are supplied.
|
||||
if 'error' in message:
|
||||
await self.handle_response(None, message['error'], message['id'])
|
||||
elif 'result' in message:
|
||||
await self.handle_response(message['result'], None, message['id'])
|
||||
return None
|
||||
|
||||
def raise_unknown_method(method):
|
||||
'''Respond to a request with an unknown method.'''
|
||||
raise self.RPCError('unknown method: "{}"'.format(method),
|
||||
self.METHOD_NOT_FOUND)
|
||||
|
||||
# --- derived classes are intended to override these functions
|
||||
async def handle_notification(self, method, params):
|
||||
'''Handle a notification.'''
|
||||
|
||||
async def handle_request(self, method, params):
|
||||
'''Handle a request.'''
|
||||
return None
|
||||
|
||||
async def handle_response(self, result, error, id_):
|
||||
'''Handle a response.'''
|
||||
|
||||
@ -309,7 +309,7 @@ class ServerManager(util.LoggedClass):
|
||||
for session in self.sessions:
|
||||
if isinstance(session, ElectrumX):
|
||||
# Use a tuple to distinguish from JSON
|
||||
session.jobs.put_nowait((self.bp.height, touched, cache))
|
||||
session.messages.put_nowait((self.bp.height, touched, cache))
|
||||
|
||||
async def shutdown(self):
|
||||
'''Call to shutdown the servers. Returns when done.'''
|
||||
@ -420,7 +420,6 @@ class Session(JSONRPC):
|
||||
self.daemon = bp.daemon
|
||||
self.coin = bp.coin
|
||||
self.kind = kind
|
||||
self.jobs = asyncio.Queue()
|
||||
self.client = 'unknown'
|
||||
|
||||
def connection_made(self, transport):
|
||||
@ -438,29 +437,30 @@ class Session(JSONRPC):
|
||||
self.send_count, self.error_count))
|
||||
self.manager.remove_session(self)
|
||||
|
||||
def method_handler(self, method):
|
||||
'''Return the handler that will handle the RPC method.'''
|
||||
return self.handlers.get(method)
|
||||
async def handle_request(self, method, params):
|
||||
'''Handle a request.'''
|
||||
handler = self.handlers.get(method)
|
||||
if not handler:
|
||||
self.raise_unknown_method(method)
|
||||
|
||||
def on_json_request(self, request):
|
||||
'''Queue the request for asynchronous handling.'''
|
||||
self.jobs.put_nowait(request)
|
||||
return await handler(params)
|
||||
|
||||
async def serve_requests(self):
|
||||
'''Asynchronously run through the task queue.'''
|
||||
while True:
|
||||
await asyncio.sleep(0)
|
||||
job = await self.jobs.get()
|
||||
message = await self.messages.get()
|
||||
try:
|
||||
if isinstance(job, tuple): # Height / mempool notification
|
||||
await self.notify(*job)
|
||||
# Height / mempool notification?
|
||||
if isinstance(message, tuple):
|
||||
await self.notify(*message)
|
||||
else:
|
||||
await self.handle_json_request(job)
|
||||
await self.handle_message(message)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception:
|
||||
# Getting here should probably be considered a bug and fixed
|
||||
self.logger.error('error handling request {}'.format(job))
|
||||
self.logger.error('error handling request {}'.format(message))
|
||||
traceback.print_exc()
|
||||
|
||||
def peername(self, *, for_log=True):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user