Clean up daemon interface.
This commit is contained in:
parent
7019b29baf
commit
9fbbc8bfdb
@ -89,7 +89,7 @@ class Prefetcher(LoggedClass):
|
|||||||
else:
|
else:
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
except DaemonError as e:
|
except DaemonError as e:
|
||||||
self.logger.info('ignoring daemon errors: {}'.format(e))
|
self.logger.info('ignoring daemon error: {}'.format(e))
|
||||||
|
|
||||||
async def _caught_up(self):
|
async def _caught_up(self):
|
||||||
'''Poll for new blocks and mempool state.
|
'''Poll for new blocks and mempool state.
|
||||||
|
|||||||
@ -24,70 +24,80 @@ class DaemonError(Exception):
|
|||||||
class Daemon(util.LoggedClass):
|
class Daemon(util.LoggedClass):
|
||||||
'''Handles connections to a daemon at the given URL.'''
|
'''Handles connections to a daemon at the given URL.'''
|
||||||
|
|
||||||
|
WARMING_UP = -28
|
||||||
|
|
||||||
def __init__(self, url):
|
def __init__(self, url):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.url = url
|
self.url = url
|
||||||
self._height = None
|
self._height = None
|
||||||
self.logger.info('connecting to daemon at URL {}'.format(url))
|
self.logger.info('connecting to daemon at URL {}'.format(url))
|
||||||
|
|
||||||
async def send_single(self, method, params=None):
|
@classmethod
|
||||||
payload = {'method': method}
|
def is_warming_up(cls, err):
|
||||||
if params:
|
if not isinstance(err, list):
|
||||||
payload['params'] = params
|
err = [err]
|
||||||
result, = await self.send((payload, ))
|
return any(elt.get('code') == cls.WARMING_UP for elt in err)
|
||||||
return result
|
|
||||||
|
|
||||||
async def send_many(self, mp_pairs):
|
|
||||||
if mp_pairs:
|
|
||||||
payload = [{'method': method, 'params': params}
|
|
||||||
for method, params in mp_pairs]
|
|
||||||
return await self.send(payload)
|
|
||||||
return []
|
|
||||||
|
|
||||||
async def send_vector(self, method, params_list):
|
|
||||||
if params_list:
|
|
||||||
payload = [{'method': method, 'params': params}
|
|
||||||
for params in params_list]
|
|
||||||
return await self.send(payload)
|
|
||||||
return []
|
|
||||||
|
|
||||||
async def send(self, payload):
|
async def send(self, payload):
|
||||||
assert isinstance(payload, (tuple, list))
|
'''Send a payload to be converted to JSON.'''
|
||||||
data = json.dumps(payload)
|
data = json.dumps(payload)
|
||||||
|
secs = 1
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
async with aiohttp.post(self.url, data=data) as resp:
|
async with aiohttp.post(self.url, data=data) as resp:
|
||||||
result = await resp.json()
|
result = await resp.json()
|
||||||
except asyncio.CancelledError:
|
if not self.is_warming_up(result):
|
||||||
raise
|
return result
|
||||||
except Exception as e:
|
msg = 'daemon is still warming up'
|
||||||
msg = 'aiohttp error: {}'.format(e)
|
except aiohttp.DisconnectedError as e:
|
||||||
secs = 3
|
msg = '{}: {}'.format(e.__class__.__name__, e)
|
||||||
else:
|
|
||||||
errs = tuple(item['error'] for item in result)
|
|
||||||
if not any(errs):
|
|
||||||
return tuple(item['result'] for item in result)
|
|
||||||
if any(err.get('code') == -28 for err in errs):
|
|
||||||
msg = 'daemon still warming up.'
|
|
||||||
secs = 30
|
|
||||||
else:
|
|
||||||
raise DaemonError(errs)
|
|
||||||
|
|
||||||
|
secs = min(180, secs * 2)
|
||||||
self.logger.error('{}. Sleeping {:d}s and trying again...'
|
self.logger.error('{}. Sleeping {:d}s and trying again...'
|
||||||
.format(msg, secs))
|
.format(msg, secs))
|
||||||
await asyncio.sleep(secs)
|
await asyncio.sleep(secs)
|
||||||
|
|
||||||
|
async def send_single(self, method, params=None):
|
||||||
|
'''Send a single request to the daemon.'''
|
||||||
|
payload = {'method': method}
|
||||||
|
if params:
|
||||||
|
payload['params'] = params
|
||||||
|
item = await self.send(payload)
|
||||||
|
if item['error']:
|
||||||
|
raise DaemonError(item['error'])
|
||||||
|
return item['result']
|
||||||
|
|
||||||
|
async def send_many(self, mp_iterable):
|
||||||
|
'''Send several requests at once.
|
||||||
|
|
||||||
|
The results are returned as a tuple.'''
|
||||||
|
payload = tuple({'method': m, 'params': p} for m, p in mp_iterable)
|
||||||
|
if payload:
|
||||||
|
items = await self.send(payload)
|
||||||
|
errs = tuple(item['error'] for item in items)
|
||||||
|
if any(errs):
|
||||||
|
raise DaemonError(errs)
|
||||||
|
return tuple(item['result'] for item in items)
|
||||||
|
return ()
|
||||||
|
|
||||||
|
async def send_vector(self, method, params_iterable):
|
||||||
|
'''Send several requests of the same method.
|
||||||
|
|
||||||
|
The results are returned as a tuple.'''
|
||||||
|
return await self.send_many((method, params)
|
||||||
|
for params in params_iterable)
|
||||||
|
|
||||||
async def block_hex_hashes(self, first, count):
|
async def block_hex_hashes(self, first, count):
|
||||||
'''Return the hex hashes of count block starting at height first.'''
|
'''Return the hex hashes of count block starting at height first.'''
|
||||||
param_lists = [[height] for height in range(first, first + count)]
|
params_iterable = ((h, ) for h in range(first, first + count))
|
||||||
return await self.send_vector('getblockhash', param_lists)
|
return await self.send_vector('getblockhash', params_iterable)
|
||||||
|
|
||||||
async def raw_blocks(self, hex_hashes):
|
async def raw_blocks(self, hex_hashes):
|
||||||
'''Return the raw binary blocks with the given hex hashes.'''
|
'''Return the raw binary blocks with the given hex hashes.'''
|
||||||
param_lists = [(h, False) for h in hex_hashes]
|
params_iterable = ((h, False) for h in hex_hashes)
|
||||||
blocks = await self.send_vector('getblock', param_lists)
|
blocks = await self.send_vector('getblock', params_iterable)
|
||||||
# Convert hex string to bytes
|
# Convert hex string to bytes
|
||||||
return [bytes.fromhex(block) for block in blocks]
|
return tuple(bytes.fromhex(block) for block in blocks)
|
||||||
|
|
||||||
async def mempool_hashes(self):
|
async def mempool_hashes(self):
|
||||||
'''Return the hashes of the txs in the daemon's mempool.'''
|
'''Return the hashes of the txs in the daemon's mempool.'''
|
||||||
|
|||||||
@ -404,8 +404,7 @@ class ElectrumX(JSONRPC):
|
|||||||
self.logger.info('sent tx: {}'.format(tx_hash))
|
self.logger.info('sent tx: {}'.format(tx_hash))
|
||||||
return tx_hash
|
return tx_hash
|
||||||
except DaemonError as e:
|
except DaemonError as e:
|
||||||
errors = e.args[0]
|
error = e.args[0]
|
||||||
error = errors[0]
|
|
||||||
message = error['message']
|
message = error['message']
|
||||||
self.logger.info('sendrawtransaction: {}'.format(message))
|
self.logger.info('sendrawtransaction: {}'.format(message))
|
||||||
if 'non-mandatory-script-verify-flag' in message:
|
if 'non-mandatory-script-verify-flag' in message:
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user