Merge branch 'daemon-tests' into devel
This commit is contained in:
commit
c5bb61fed2
@ -111,10 +111,6 @@ class Coin(object):
|
|||||||
url = 'http://' + url
|
url = 'http://' + url
|
||||||
return url + '/'
|
return url + '/'
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def daemon_urls(cls, urls):
|
|
||||||
return [cls.sanitize_url(url) for url in urls.split(',')]
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def genesis_block(cls, block):
|
def genesis_block(cls, block):
|
||||||
'''Check the Genesis block is the right one for this coin.
|
'''Check the Genesis block is the right one for this coin.
|
||||||
|
|||||||
@ -13,7 +13,7 @@ import sys
|
|||||||
import time
|
import time
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
|
||||||
from aiorpcx import TaskGroup
|
from aiorpcx import spawn
|
||||||
|
|
||||||
from electrumx.lib.util import class_logger
|
from electrumx.lib.util import class_logger
|
||||||
|
|
||||||
@ -93,12 +93,11 @@ class ServerBase(object):
|
|||||||
loop.set_exception_handler(self.on_exception)
|
loop.set_exception_handler(self.on_exception)
|
||||||
|
|
||||||
shutdown_event = asyncio.Event()
|
shutdown_event = asyncio.Event()
|
||||||
async with TaskGroup() as group:
|
server_task = await spawn(self.serve(shutdown_event))
|
||||||
server_task = await group.spawn(self.serve(shutdown_event))
|
# Wait for shutdown, log on receipt of the event
|
||||||
# Wait for shutdown, log on receipt of the event
|
await shutdown_event.wait()
|
||||||
await shutdown_event.wait()
|
self.logger.info('shutting down')
|
||||||
self.logger.info('shutting down')
|
server_task.cancel()
|
||||||
server_task.cancel()
|
|
||||||
|
|
||||||
# Prevent some silly logs
|
# Prevent some silly logs
|
||||||
await asyncio.sleep(0.01)
|
await asyncio.sleep(0.01)
|
||||||
|
|||||||
@ -650,10 +650,7 @@ class BlockProcessor(object):
|
|||||||
could be lost.
|
could be lost.
|
||||||
'''
|
'''
|
||||||
self._caught_up_event = caught_up_event
|
self._caught_up_event = caught_up_event
|
||||||
async with TaskGroup() as group:
|
await self._first_open_dbs()
|
||||||
await group.spawn(self._first_open_dbs())
|
|
||||||
# Ensure cached_height is set
|
|
||||||
await group.spawn(self.daemon.height())
|
|
||||||
try:
|
try:
|
||||||
async with TaskGroup() as group:
|
async with TaskGroup() as group:
|
||||||
await group.spawn(self.prefetcher.main_loop(self.height))
|
await group.spawn(self.prefetcher.main_loop(self.height))
|
||||||
|
|||||||
@ -92,9 +92,11 @@ class Controller(ServerBase):
|
|||||||
self.logger.info(f'reorg limit is {env.reorg_limit:,d} blocks')
|
self.logger.info(f'reorg limit is {env.reorg_limit:,d} blocks')
|
||||||
|
|
||||||
notifications = Notifications()
|
notifications = Notifications()
|
||||||
daemon = env.coin.DAEMON(env)
|
Daemon = env.coin.DAEMON
|
||||||
db = DB(env)
|
|
||||||
BlockProcessor = env.coin.BLOCK_PROCESSOR
|
BlockProcessor = env.coin.BLOCK_PROCESSOR
|
||||||
|
|
||||||
|
daemon = Daemon(env.coin, env.daemon_url)
|
||||||
|
db = DB(env)
|
||||||
bp = BlockProcessor(env, db, daemon, notifications)
|
bp = BlockProcessor(env, db, daemon, notifications)
|
||||||
|
|
||||||
# Set ourselves up to implement the MemPoolAPI
|
# Set ourselves up to implement the MemPoolAPI
|
||||||
@ -110,10 +112,13 @@ class Controller(ServerBase):
|
|||||||
session_mgr = SessionManager(env, db, bp, daemon, mempool,
|
session_mgr = SessionManager(env, db, bp, daemon, mempool,
|
||||||
notifications, shutdown_event)
|
notifications, shutdown_event)
|
||||||
|
|
||||||
|
# Test daemon authentication, and also ensure it has a cached
|
||||||
|
# height. Do this before entering the task group.
|
||||||
|
await daemon.height()
|
||||||
|
|
||||||
caught_up_event = Event()
|
caught_up_event = Event()
|
||||||
serve_externally_event = Event()
|
serve_externally_event = Event()
|
||||||
synchronized_event = Event()
|
synchronized_event = Event()
|
||||||
|
|
||||||
async with TaskGroup() as group:
|
async with TaskGroup() as group:
|
||||||
await group.spawn(session_mgr.serve(serve_externally_event))
|
await group.spawn(session_mgr.serve(serve_externally_event))
|
||||||
await group.spawn(bp.fetch_and_process_blocks(caught_up_event))
|
await group.spawn(bp.fetch_and_process_blocks(caught_up_event))
|
||||||
|
|||||||
@ -9,6 +9,7 @@
|
|||||||
daemon.'''
|
daemon.'''
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import itertools
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
from calendar import timegm
|
from calendar import timegm
|
||||||
@ -28,48 +29,53 @@ class DaemonError(Exception):
|
|||||||
'''Raised when the daemon returns an error in its results.'''
|
'''Raised when the daemon returns an error in its results.'''
|
||||||
|
|
||||||
|
|
||||||
|
class WarmingUpError(Exception):
|
||||||
|
'''Internal - when the daemon is warming up.'''
|
||||||
|
|
||||||
|
|
||||||
|
class WorkQueueFullError(Exception):
|
||||||
|
'''Internal - when the daemon's work queue is full.'''
|
||||||
|
|
||||||
|
|
||||||
class Daemon(object):
|
class Daemon(object):
|
||||||
'''Handles connections to a daemon at the given URL.'''
|
'''Handles connections to a daemon at the given URL.'''
|
||||||
|
|
||||||
WARMING_UP = -28
|
WARMING_UP = -28
|
||||||
RPC_MISC_ERROR = -1
|
id_counter = itertools.count()
|
||||||
|
|
||||||
class DaemonWarmingUpError(Exception):
|
def __init__(self, coin, url, max_workqueue=10, init_retry=0.25,
|
||||||
'''Raised when the daemon returns an error in its results.'''
|
max_retry=4.0):
|
||||||
|
self.coin = coin
|
||||||
def __init__(self, env):
|
|
||||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||||
self.coin = env.coin
|
self.set_url(url)
|
||||||
self.set_urls(env.coin.daemon_urls(env.daemon_url))
|
|
||||||
self._height = None
|
|
||||||
# Limit concurrent RPC calls to this number.
|
# Limit concurrent RPC calls to this number.
|
||||||
# See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16
|
# See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16
|
||||||
self.workqueue_semaphore = asyncio.Semaphore(value=10)
|
self.workqueue_semaphore = asyncio.Semaphore(value=max_workqueue)
|
||||||
self.down = False
|
self.init_retry = init_retry
|
||||||
self.last_error_time = 0
|
self.max_retry = max_retry
|
||||||
self.req_id = 0
|
self._height = None
|
||||||
self._available_rpcs = {} # caches results for _is_rpc_available()
|
self.available_rpcs = {}
|
||||||
|
|
||||||
def next_req_id(self):
|
def set_url(self, url):
|
||||||
'''Retrns the next request ID.'''
|
|
||||||
self.req_id += 1
|
|
||||||
return self.req_id
|
|
||||||
|
|
||||||
def set_urls(self, urls):
|
|
||||||
'''Set the URLS to the given list, and switch to the first one.'''
|
'''Set the URLS to the given list, and switch to the first one.'''
|
||||||
if not urls:
|
urls = url.split(',')
|
||||||
raise DaemonError('no daemon URLs provided')
|
urls = [self.coin.sanitize_url(url) for url in urls]
|
||||||
self.urls = urls
|
|
||||||
self.url_index = 0
|
|
||||||
for n, url in enumerate(urls):
|
for n, url in enumerate(urls):
|
||||||
self.logger.info('daemon #{:d} at {}{}'
|
status = '' if n else ' (current)'
|
||||||
.format(n + 1, self.logged_url(url),
|
logged_url = self.logged_url(url)
|
||||||
'' if n else ' (current)'))
|
self.logger.info(f'daemon #{n + 1} at {logged_url}{status}')
|
||||||
|
self.url_index = 0
|
||||||
|
self.urls = urls
|
||||||
|
|
||||||
def url(self):
|
def current_url(self):
|
||||||
'''Returns the current daemon URL.'''
|
'''Returns the current daemon URL.'''
|
||||||
return self.urls[self.url_index]
|
return self.urls[self.url_index]
|
||||||
|
|
||||||
|
def logged_url(self, url=None):
|
||||||
|
'''The host and port part, for logging.'''
|
||||||
|
url = url or self.current_url()
|
||||||
|
return url[url.rindex('@') + 1:]
|
||||||
|
|
||||||
def failover(self):
|
def failover(self):
|
||||||
'''Call to fail-over to the next daemon URL.
|
'''Call to fail-over to the next daemon URL.
|
||||||
|
|
||||||
@ -77,7 +83,7 @@ class Daemon(object):
|
|||||||
'''
|
'''
|
||||||
if len(self.urls) > 1:
|
if len(self.urls) > 1:
|
||||||
self.url_index = (self.url_index + 1) % len(self.urls)
|
self.url_index = (self.url_index + 1) % len(self.urls)
|
||||||
self.logger.info('failing over to {}'.format(self.logged_url()))
|
self.logger.info(f'failing over to {self.logged_url()}')
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@ -88,13 +94,17 @@ class Daemon(object):
|
|||||||
async def _send_data(self, data):
|
async def _send_data(self, data):
|
||||||
async with self.workqueue_semaphore:
|
async with self.workqueue_semaphore:
|
||||||
async with self.client_session() as session:
|
async with self.client_session() as session:
|
||||||
async with session.post(self.url(), data=data) as resp:
|
async with session.post(self.current_url(), data=data) as resp:
|
||||||
# If bitcoind can't find a tx, for some reason
|
kind = resp.headers.get('Content-Type', None)
|
||||||
# it returns 500 but fills out the JSON.
|
if kind == 'application/json':
|
||||||
# Should still return 200 IMO.
|
|
||||||
if resp.status in (200, 404, 500):
|
|
||||||
return await resp.json()
|
return await resp.json()
|
||||||
return (resp.status, resp.reason)
|
# bitcoind's HTTP protocol "handling" is a bad joke
|
||||||
|
text = await resp.text()
|
||||||
|
if 'Work queue depth exceeded' in text:
|
||||||
|
raise WorkQueueFullError
|
||||||
|
text = text.strip() or resp.reason
|
||||||
|
self.logger.error(text)
|
||||||
|
raise DaemonError(text)
|
||||||
|
|
||||||
async def _send(self, payload, processor):
|
async def _send(self, payload, processor):
|
||||||
'''Send a payload to be converted to JSON.
|
'''Send a payload to be converted to JSON.
|
||||||
@ -103,54 +113,42 @@ class Daemon(object):
|
|||||||
are raise through DaemonError.
|
are raise through DaemonError.
|
||||||
'''
|
'''
|
||||||
def log_error(error):
|
def log_error(error):
|
||||||
self.down = True
|
nonlocal last_error_log, retry
|
||||||
now = time.time()
|
now = time.time()
|
||||||
prior_time = self.last_error_time
|
if now - last_error_log > 60:
|
||||||
if now - prior_time > 60:
|
last_error_time = now
|
||||||
self.last_error_time = now
|
self.logger.error(f'{error} Retrying occasionally...')
|
||||||
if prior_time and self.failover():
|
if retry == self.max_retry and self.failover():
|
||||||
secs = 0
|
retry = 0
|
||||||
else:
|
|
||||||
self.logger.error('{} Retrying occasionally...'
|
|
||||||
.format(error))
|
|
||||||
|
|
||||||
|
on_good_message = None
|
||||||
|
last_error_log = 0
|
||||||
data = json.dumps(payload)
|
data = json.dumps(payload)
|
||||||
secs = 1
|
retry = self.init_retry
|
||||||
max_secs = 4
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
result = await self._send_data(data)
|
result = await self._send_data(data)
|
||||||
if not isinstance(result, tuple):
|
result = processor(result)
|
||||||
result = processor(result)
|
if on_good_message:
|
||||||
if self.down:
|
self.logger.info(on_good_message)
|
||||||
self.down = False
|
return result
|
||||||
self.last_error_time = 0
|
|
||||||
self.logger.info('connection restored')
|
|
||||||
return result
|
|
||||||
log_error('HTTP error code {:d}: {}'
|
|
||||||
.format(result[0], result[1]))
|
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
log_error('timeout error.')
|
log_error('timeout error.')
|
||||||
except aiohttp.ServerDisconnectedError:
|
except aiohttp.ServerDisconnectedError:
|
||||||
log_error('disconnected.')
|
log_error('disconnected.')
|
||||||
except aiohttp.ClientPayloadError:
|
on_good_message = 'connection restored'
|
||||||
log_error('payload encoding error.')
|
|
||||||
except aiohttp.ClientConnectionError:
|
except aiohttp.ClientConnectionError:
|
||||||
log_error('connection problem - is your daemon running?')
|
log_error('connection problem - is your daemon running?')
|
||||||
except self.DaemonWarmingUpError:
|
on_good_message = 'connection restored'
|
||||||
|
except WarmingUpError:
|
||||||
log_error('starting up checking blocks.')
|
log_error('starting up checking blocks.')
|
||||||
except (asyncio.CancelledError, DaemonError):
|
on_good_message = 'running normally'
|
||||||
raise
|
except WorkQueueFullError:
|
||||||
except Exception as e:
|
log_error('work queue full.')
|
||||||
self.logger.exception(f'uncaught exception: {e}')
|
on_good_message = 'running normally'
|
||||||
|
|
||||||
await asyncio.sleep(secs)
|
await asyncio.sleep(retry)
|
||||||
secs = min(max_secs, secs * 2, 1)
|
retry = max(min(self.max_retry, retry * 2), self.init_retry)
|
||||||
|
|
||||||
def logged_url(self, url=None):
|
|
||||||
'''The host and port part, for logging.'''
|
|
||||||
url = url or self.url()
|
|
||||||
return url[url.rindex('@') + 1:]
|
|
||||||
|
|
||||||
async def _send_single(self, method, params=None):
|
async def _send_single(self, method, params=None):
|
||||||
'''Send a single request to the daemon.'''
|
'''Send a single request to the daemon.'''
|
||||||
@ -159,10 +157,10 @@ class Daemon(object):
|
|||||||
if not err:
|
if not err:
|
||||||
return result['result']
|
return result['result']
|
||||||
if err.get('code') == self.WARMING_UP:
|
if err.get('code') == self.WARMING_UP:
|
||||||
raise self.DaemonWarmingUpError
|
raise WarmingUpError
|
||||||
raise DaemonError(err)
|
raise DaemonError(err)
|
||||||
|
|
||||||
payload = {'method': method, 'id': self.next_req_id()}
|
payload = {'method': method, 'id': next(self.id_counter)}
|
||||||
if params:
|
if params:
|
||||||
payload['params'] = params
|
payload['params'] = params
|
||||||
return await self._send(payload, processor)
|
return await self._send(payload, processor)
|
||||||
@ -176,12 +174,12 @@ class Daemon(object):
|
|||||||
def processor(result):
|
def processor(result):
|
||||||
errs = [item['error'] for item in result if item['error']]
|
errs = [item['error'] for item in result if item['error']]
|
||||||
if any(err.get('code') == self.WARMING_UP for err in errs):
|
if any(err.get('code') == self.WARMING_UP for err in errs):
|
||||||
raise self.DaemonWarmingUpError
|
raise WarmingUpError
|
||||||
if not errs or replace_errs:
|
if not errs or replace_errs:
|
||||||
return [item['result'] for item in result]
|
return [item['result'] for item in result]
|
||||||
raise DaemonError(errs)
|
raise DaemonError(errs)
|
||||||
|
|
||||||
payload = [{'method': method, 'params': p, 'id': self.next_req_id()}
|
payload = [{'method': method, 'params': p, 'id': next(self.id_counter)}
|
||||||
for p in params_iterable]
|
for p in params_iterable]
|
||||||
if payload:
|
if payload:
|
||||||
return await self._send(payload, processor)
|
return await self._send(payload, processor)
|
||||||
@ -192,27 +190,16 @@ class Daemon(object):
|
|||||||
|
|
||||||
Results are cached and the daemon will generally not be queried with
|
Results are cached and the daemon will generally not be queried with
|
||||||
the same method more than once.'''
|
the same method more than once.'''
|
||||||
available = self._available_rpcs.get(method, None)
|
available = self.available_rpcs.get(method)
|
||||||
if available is None:
|
if available is None:
|
||||||
|
available = True
|
||||||
try:
|
try:
|
||||||
await self._send_single(method)
|
await self._send_single(method)
|
||||||
available = True
|
|
||||||
except DaemonError as e:
|
except DaemonError as e:
|
||||||
err = e.args[0]
|
err = e.args[0]
|
||||||
error_code = err.get("code")
|
error_code = err.get("code")
|
||||||
if error_code == JSONRPC.METHOD_NOT_FOUND:
|
available = error_code != JSONRPC.METHOD_NOT_FOUND
|
||||||
available = False
|
self.available_rpcs[method] = available
|
||||||
elif error_code == self.RPC_MISC_ERROR:
|
|
||||||
# method found but exception was thrown in command handling
|
|
||||||
# probably because we did not provide arguments
|
|
||||||
available = True
|
|
||||||
else:
|
|
||||||
self.logger.warning('error (code {:d}: {}) when testing '
|
|
||||||
'RPC availability of method {}'
|
|
||||||
.format(error_code, err.get("message"),
|
|
||||||
method))
|
|
||||||
available = False
|
|
||||||
self._available_rpcs[method] = available
|
|
||||||
return available
|
return available
|
||||||
|
|
||||||
async def block_hex_hashes(self, first, count):
|
async def block_hex_hashes(self, first, count):
|
||||||
@ -235,12 +222,16 @@ class Daemon(object):
|
|||||||
'''Update our record of the daemon's mempool hashes.'''
|
'''Update our record of the daemon's mempool hashes.'''
|
||||||
return await self._send_single('getrawmempool')
|
return await self._send_single('getrawmempool')
|
||||||
|
|
||||||
async def estimatefee(self, params):
|
async def estimatefee(self, block_count):
|
||||||
'''Return the fee estimate for the given parameters.'''
|
'''Return the fee estimate for the block count. Units are whole
|
||||||
|
currency units per KB, e.g. 0.00000995, or -1 if no estimate
|
||||||
|
is available.
|
||||||
|
'''
|
||||||
|
args = (block_count, )
|
||||||
if await self._is_rpc_available('estimatesmartfee'):
|
if await self._is_rpc_available('estimatesmartfee'):
|
||||||
estimate = await self._send_single('estimatesmartfee', params)
|
estimate = await self._send_single('estimatesmartfee', args)
|
||||||
return estimate.get('feerate', -1)
|
return estimate.get('feerate', -1)
|
||||||
return await self._send_single('estimatefee', params)
|
return await self._send_single('estimatefee', args)
|
||||||
|
|
||||||
async def getnetworkinfo(self):
|
async def getnetworkinfo(self):
|
||||||
'''Return the result of the 'getnetworkinfo' RPC call.'''
|
'''Return the result of the 'getnetworkinfo' RPC call.'''
|
||||||
@ -268,9 +259,9 @@ class Daemon(object):
|
|||||||
# Convert hex strings to bytes
|
# Convert hex strings to bytes
|
||||||
return [hex_to_bytes(tx) if tx else None for tx in txs]
|
return [hex_to_bytes(tx) if tx else None for tx in txs]
|
||||||
|
|
||||||
async def sendrawtransaction(self, params):
|
async def broadcast_transaction(self, raw_tx):
|
||||||
'''Broadcast a transaction to the network.'''
|
'''Broadcast a transaction to the network.'''
|
||||||
return await self._send_single('sendrawtransaction', params)
|
return await self._send_single('sendrawtransaction', (raw_tx, ))
|
||||||
|
|
||||||
async def height(self):
|
async def height(self):
|
||||||
'''Query the daemon for its current height.'''
|
'''Query the daemon for its current height.'''
|
||||||
@ -299,7 +290,7 @@ class FakeEstimateFeeDaemon(Daemon):
|
|||||||
'''Daemon that simulates estimatefee and relayfee RPC calls. Coin that
|
'''Daemon that simulates estimatefee and relayfee RPC calls. Coin that
|
||||||
wants to use this daemon must define ESTIMATE_FEE & RELAY_FEE'''
|
wants to use this daemon must define ESTIMATE_FEE & RELAY_FEE'''
|
||||||
|
|
||||||
async def estimatefee(self, params):
|
async def estimatefee(self, block_count):
|
||||||
'''Return the fee estimate for the given parameters.'''
|
'''Return the fee estimate for the given parameters.'''
|
||||||
return self.coin.ESTIMATE_FEE
|
return self.coin.ESTIMATE_FEE
|
||||||
|
|
||||||
|
|||||||
@ -383,7 +383,7 @@ class SessionManager(object):
|
|||||||
'''Replace the daemon URL.'''
|
'''Replace the daemon URL.'''
|
||||||
daemon_url = daemon_url or self.env.daemon_url
|
daemon_url = daemon_url or self.env.daemon_url
|
||||||
try:
|
try:
|
||||||
self.daemon.set_urls(self.env.coin.daemon_urls(daemon_url))
|
self.daemon.set_url(daemon_url)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise RPCError(BAD_REQUEST, f'an error occured: {e!r}')
|
raise RPCError(BAD_REQUEST, f'an error occured: {e!r}')
|
||||||
return f'now using daemon at {self.daemon.logged_url()}'
|
return f'now using daemon at {self.daemon.logged_url()}'
|
||||||
@ -535,7 +535,7 @@ class SessionManager(object):
|
|||||||
return electrum_header
|
return electrum_header
|
||||||
|
|
||||||
async def broadcast_transaction(self, raw_tx):
|
async def broadcast_transaction(self, raw_tx):
|
||||||
hex_hash = await self.daemon.sendrawtransaction([raw_tx])
|
hex_hash = await self.daemon.broadcast_transaction(raw_tx)
|
||||||
self.txs_sent += 1
|
self.txs_sent += 1
|
||||||
return hex_hash
|
return hex_hash
|
||||||
|
|
||||||
@ -1088,7 +1088,7 @@ class ElectrumX(SessionBase):
|
|||||||
number: the number of blocks
|
number: the number of blocks
|
||||||
'''
|
'''
|
||||||
number = non_negative_integer(number)
|
number = non_negative_integer(number)
|
||||||
return await self.daemon_request('estimatefee', [number])
|
return await self.daemon_request('estimatefee', number)
|
||||||
|
|
||||||
async def ping(self):
|
async def ping(self):
|
||||||
'''Serves as a connection keep-alive mechanism and for the client to
|
'''Serves as a connection keep-alive mechanism and for the client to
|
||||||
@ -1144,7 +1144,7 @@ class ElectrumX(SessionBase):
|
|||||||
except DaemonError as e:
|
except DaemonError as e:
|
||||||
error, = e.args
|
error, = e.args
|
||||||
message = error['message']
|
message = error['message']
|
||||||
self.logger.info(f'sendrawtransaction: {message}')
|
self.logger.info(f'error sending transaction: {message}')
|
||||||
raise RPCError(BAD_REQUEST, 'the transaction was rejected by '
|
raise RPCError(BAD_REQUEST, 'the transaction was rejected by '
|
||||||
f'network rules.\n\n{message}\n[{raw_tx}]')
|
f'network rules.\n\n{message}\n[{raw_tx}]')
|
||||||
|
|
||||||
|
|||||||
489
tests/server/test_daemon.py
Normal file
489
tests/server/test_daemon.py
Normal file
@ -0,0 +1,489 @@
|
|||||||
|
import aiohttp
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from aiorpcx import (
|
||||||
|
JSONRPCv1, JSONRPCLoose, RPCError, ignore_after,
|
||||||
|
Request, Batch,
|
||||||
|
)
|
||||||
|
from electrumx.lib.coins import BitcoinCash, CoinError, Bitzeny
|
||||||
|
from electrumx.server.daemon import (
|
||||||
|
Daemon, FakeEstimateFeeDaemon, DaemonError
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
coin = BitcoinCash
|
||||||
|
|
||||||
|
# These should be full, canonical URLs
|
||||||
|
urls = ['http://rpc_user:rpc_pass@127.0.0.1:8332/',
|
||||||
|
'http://rpc_user:rpc_pass@192.168.0.1:8332/']
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(params=[BitcoinCash, Bitzeny])
|
||||||
|
def daemon(request):
|
||||||
|
coin = request.param
|
||||||
|
return coin.DAEMON(coin, ','.join(urls))
|
||||||
|
|
||||||
|
|
||||||
|
class ResponseBase(object):
|
||||||
|
|
||||||
|
def __init__(self, headers, status):
|
||||||
|
self.headers = headers
|
||||||
|
self.status = status
|
||||||
|
|
||||||
|
async def __aenter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __aexit__(self, exc_type, exc_value, traceback):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class JSONResponse(ResponseBase):
|
||||||
|
|
||||||
|
def __init__(self, result, msg_id, status=200):
|
||||||
|
super().__init__({'Content-Type': 'application/json'}, status)
|
||||||
|
self.result = result
|
||||||
|
self.msg_id = msg_id
|
||||||
|
|
||||||
|
async def json(self):
|
||||||
|
if isinstance(self.msg_id, int):
|
||||||
|
message = JSONRPCv1.response_message(self.result, self.msg_id)
|
||||||
|
else:
|
||||||
|
parts = [JSONRPCv1.response_message(item, msg_id)
|
||||||
|
for item, msg_id in zip(self.result, self.msg_id)]
|
||||||
|
message = JSONRPCv1.batch_message_from_parts(parts)
|
||||||
|
return json.loads(message.decode())
|
||||||
|
|
||||||
|
|
||||||
|
class HTMLResponse(ResponseBase):
|
||||||
|
|
||||||
|
def __init__(self, text, reason, status):
|
||||||
|
super().__init__({'Content-Type': 'text/html; charset=ISO-8859-1'},
|
||||||
|
status)
|
||||||
|
self._text = text
|
||||||
|
self.reason = reason
|
||||||
|
|
||||||
|
async def text(self):
|
||||||
|
return self._text
|
||||||
|
|
||||||
|
|
||||||
|
class ClientSessionBase(object):
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.prior_class = aiohttp.ClientSession
|
||||||
|
aiohttp.ClientSession = lambda: self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
|
aiohttp.ClientSession = self.prior_class
|
||||||
|
|
||||||
|
async def __aenter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __aexit__(self, exc_type, exc_value, traceback):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ClientSessionGood(ClientSessionBase):
|
||||||
|
'''Imitate aiohttp for testing purposes.'''
|
||||||
|
|
||||||
|
def __init__(self, *triples):
|
||||||
|
self.triples = triples # each a (method, args, result)
|
||||||
|
self.count = 0
|
||||||
|
self.expected_url = urls[0]
|
||||||
|
|
||||||
|
def post(self, url, data=""):
|
||||||
|
assert url == self.expected_url
|
||||||
|
request, request_id = JSONRPCLoose.message_to_item(data.encode())
|
||||||
|
method, args, result = self.triples[self.count]
|
||||||
|
self.count += 1
|
||||||
|
if isinstance(request, Request):
|
||||||
|
assert request.method == method
|
||||||
|
assert request.args == args
|
||||||
|
return JSONResponse(result, request_id)
|
||||||
|
else:
|
||||||
|
assert isinstance(request, Batch)
|
||||||
|
for request, args in zip(request, args):
|
||||||
|
assert request.method == method
|
||||||
|
assert request.args == args
|
||||||
|
return JSONResponse(result, request_id)
|
||||||
|
|
||||||
|
|
||||||
|
class ClientSessionBadAuth(ClientSessionBase):
|
||||||
|
|
||||||
|
def post(self, url, data=""):
|
||||||
|
return HTMLResponse('', 'Unauthorized', 401)
|
||||||
|
|
||||||
|
|
||||||
|
class ClientSessionWorkQueueFull(ClientSessionGood):
|
||||||
|
|
||||||
|
def post(self, url, data=""):
|
||||||
|
self.post = super().post
|
||||||
|
return HTMLResponse('Work queue depth exceeded',
|
||||||
|
'Internal server error', 500)
|
||||||
|
|
||||||
|
|
||||||
|
class ClientSessionNoConnection(ClientSessionGood):
|
||||||
|
|
||||||
|
def __init__(self, *args):
|
||||||
|
self.args = args
|
||||||
|
|
||||||
|
async def __aenter__(self):
|
||||||
|
aiohttp.ClientSession = lambda: ClientSessionGood(*self.args)
|
||||||
|
raise aiohttp.ClientConnectionError
|
||||||
|
|
||||||
|
|
||||||
|
class ClientSessionPostError(ClientSessionGood):
|
||||||
|
|
||||||
|
def __init__(self, exception, *args):
|
||||||
|
self.exception = exception
|
||||||
|
self.args = args
|
||||||
|
|
||||||
|
def post(self, url, data=""):
|
||||||
|
aiohttp.ClientSession = lambda: ClientSessionGood(*self.args)
|
||||||
|
raise self.exception
|
||||||
|
|
||||||
|
|
||||||
|
class ClientSessionFailover(ClientSessionGood):
|
||||||
|
|
||||||
|
def post(self, url, data=""):
|
||||||
|
# If not failed over; simulate disconnecting
|
||||||
|
if url == self.expected_url:
|
||||||
|
raise aiohttp.ServerDisconnectedError
|
||||||
|
else:
|
||||||
|
self.expected_url = urls[1]
|
||||||
|
return super().post(url, data)
|
||||||
|
|
||||||
|
|
||||||
|
def in_caplog(caplog, message, count=1):
|
||||||
|
return sum(message in record.message
|
||||||
|
for record in caplog.records) == count
|
||||||
|
|
||||||
|
#
|
||||||
|
# Tests
|
||||||
|
#
|
||||||
|
|
||||||
|
def test_set_urls_bad():
|
||||||
|
with pytest.raises(CoinError):
|
||||||
|
Daemon(coin, '')
|
||||||
|
with pytest.raises(CoinError):
|
||||||
|
Daemon(coin, 'a')
|
||||||
|
|
||||||
|
|
||||||
|
def test_set_urls_one(caplog):
|
||||||
|
with caplog.at_level(logging.INFO):
|
||||||
|
daemon = Daemon(coin, urls[0])
|
||||||
|
assert daemon.current_url() == urls[0]
|
||||||
|
assert len(daemon.urls) == 1
|
||||||
|
logged_url = daemon.logged_url()
|
||||||
|
assert logged_url == '127.0.0.1:8332/'
|
||||||
|
assert in_caplog(caplog, f'daemon #1 at {logged_url} (current)')
|
||||||
|
|
||||||
|
|
||||||
|
def test_set_urls_two(caplog):
|
||||||
|
with caplog.at_level(logging.INFO):
|
||||||
|
daemon = Daemon(coin, ','.join(urls))
|
||||||
|
assert daemon.current_url() == urls[0]
|
||||||
|
assert len(daemon.urls) == 2
|
||||||
|
logged_url = daemon.logged_url()
|
||||||
|
assert logged_url == '127.0.0.1:8332/'
|
||||||
|
assert in_caplog(caplog, f'daemon #1 at {logged_url} (current)')
|
||||||
|
assert in_caplog(caplog, 'daemon #2 at 192.168.0.1:8332')
|
||||||
|
|
||||||
|
|
||||||
|
def test_set_urls_short():
|
||||||
|
no_prefix_urls = ['/'.join(part for part in url.split('/')[2:])
|
||||||
|
for url in urls]
|
||||||
|
daemon = Daemon(coin, ','.join(no_prefix_urls))
|
||||||
|
assert daemon.current_url() == urls[0]
|
||||||
|
assert len(daemon.urls) == 2
|
||||||
|
|
||||||
|
no_slash_urls = [url[:-1] for url in urls]
|
||||||
|
daemon = Daemon(coin, ','.join(no_slash_urls))
|
||||||
|
assert daemon.current_url() == urls[0]
|
||||||
|
assert len(daemon.urls) == 2
|
||||||
|
|
||||||
|
no_port_urls = [url[:url.rfind(':')] for url in urls]
|
||||||
|
daemon = Daemon(coin, ','.join(no_port_urls))
|
||||||
|
assert daemon.current_url() == urls[0]
|
||||||
|
assert len(daemon.urls) == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_failover_good(caplog):
|
||||||
|
daemon = Daemon(coin, ','.join(urls))
|
||||||
|
with caplog.at_level(logging.INFO):
|
||||||
|
result = daemon.failover()
|
||||||
|
assert result is True
|
||||||
|
assert daemon.current_url() == urls[1]
|
||||||
|
logged_url = daemon.logged_url()
|
||||||
|
assert in_caplog(caplog, f'failing over to {logged_url}')
|
||||||
|
# And again
|
||||||
|
result = daemon.failover()
|
||||||
|
assert result is True
|
||||||
|
assert daemon.current_url() == urls[0]
|
||||||
|
|
||||||
|
|
||||||
|
def test_failover_fail(caplog):
|
||||||
|
daemon = Daemon(coin, urls[0])
|
||||||
|
with caplog.at_level(logging.INFO):
|
||||||
|
result = daemon.failover()
|
||||||
|
assert result is False
|
||||||
|
assert daemon.current_url() == urls[0]
|
||||||
|
assert not in_caplog(caplog, f'failing over')
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_height(daemon):
|
||||||
|
assert daemon.cached_height() is None
|
||||||
|
height = 300
|
||||||
|
with ClientSessionGood(('getblockcount', [], height)):
|
||||||
|
assert await daemon.height() == height
|
||||||
|
assert daemon.cached_height() == height
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_broadcast_transaction(daemon):
|
||||||
|
raw_tx = 'deadbeef'
|
||||||
|
tx_hash = 'hash'
|
||||||
|
with ClientSessionGood(('sendrawtransaction', [raw_tx], tx_hash)):
|
||||||
|
assert await daemon.broadcast_transaction(raw_tx) == tx_hash
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_relayfee(daemon):
|
||||||
|
response = {"relayfee": sats, "other:": "cruft"}
|
||||||
|
with ClientSessionGood(('getnetworkinfo', [], response)):
|
||||||
|
assert await daemon.getnetworkinfo() == response
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_relayfee(daemon):
|
||||||
|
if isinstance(daemon, FakeEstimateFeeDaemon):
|
||||||
|
sats = daemon.coin.ESTIMATE_FEE
|
||||||
|
else:
|
||||||
|
sats = 2
|
||||||
|
response = {"relayfee": sats, "other:": "cruft"}
|
||||||
|
with ClientSessionGood(('getnetworkinfo', [], response)):
|
||||||
|
assert await daemon.relayfee() == sats
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_mempool_hashes(daemon):
|
||||||
|
hashes = ['hex_hash1', 'hex_hash2']
|
||||||
|
with ClientSessionGood(('getrawmempool', [], hashes)):
|
||||||
|
assert await daemon.mempool_hashes() == hashes
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_deserialised_block(daemon):
|
||||||
|
block_hash = 'block_hash'
|
||||||
|
result = {'some': 'mess'}
|
||||||
|
with ClientSessionGood(('getblock', [block_hash, True], result)):
|
||||||
|
assert await daemon.deserialised_block(block_hash) == result
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_estimatefee(daemon):
|
||||||
|
method_not_found = RPCError(JSONRPCv1.METHOD_NOT_FOUND, 'nope')
|
||||||
|
if isinstance(daemon, FakeEstimateFeeDaemon):
|
||||||
|
result = daemon.coin.ESTIMATE_FEE
|
||||||
|
else:
|
||||||
|
result = -1
|
||||||
|
with ClientSessionGood(
|
||||||
|
('estimatesmartfee', [], method_not_found),
|
||||||
|
('estimatefee', [2], result)
|
||||||
|
):
|
||||||
|
assert await daemon.estimatefee(2) == result
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_estimatefee_smart(daemon):
|
||||||
|
bad_args = RPCError(JSONRPCv1.INVALID_ARGS, 'bad args')
|
||||||
|
if isinstance(daemon, FakeEstimateFeeDaemon):
|
||||||
|
return
|
||||||
|
rate = 0.0002
|
||||||
|
result = {'feerate': rate}
|
||||||
|
with ClientSessionGood(
|
||||||
|
('estimatesmartfee', [], bad_args),
|
||||||
|
('estimatesmartfee', [2], result)
|
||||||
|
):
|
||||||
|
assert await daemon.estimatefee(2) == rate
|
||||||
|
|
||||||
|
# Test the rpc_available_cache is used
|
||||||
|
with ClientSessionGood(('estimatesmartfee', [2], result)):
|
||||||
|
assert await daemon.estimatefee(2) == rate
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_getrawtransaction(daemon):
|
||||||
|
hex_hash = 'deadbeef'
|
||||||
|
simple = 'tx_in_hex'
|
||||||
|
verbose = {'hex': hex_hash, 'other': 'cruft'}
|
||||||
|
# Test False is converted to 0 - old daemon's reject False
|
||||||
|
with ClientSessionGood(('getrawtransaction', [hex_hash, 0], simple)):
|
||||||
|
assert await daemon.getrawtransaction(hex_hash) == simple
|
||||||
|
|
||||||
|
# Test True is converted to 1
|
||||||
|
with ClientSessionGood(('getrawtransaction', [hex_hash, 1], verbose)):
|
||||||
|
assert await daemon.getrawtransaction(
|
||||||
|
hex_hash, True) == verbose
|
||||||
|
|
||||||
|
|
||||||
|
# Batch tests
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_empty_send(daemon):
|
||||||
|
first = 5
|
||||||
|
count = 0
|
||||||
|
with ClientSessionGood(('getblockhash', [], [])):
|
||||||
|
assert await daemon.block_hex_hashes(first, count) == []
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_block_hex_hashes(daemon):
|
||||||
|
first = 5
|
||||||
|
count = 3
|
||||||
|
hashes = [f'hex_hash{n}' for n in range(count)]
|
||||||
|
with ClientSessionGood(('getblockhash',
|
||||||
|
[[n] for n in range(first, first + count)],
|
||||||
|
hashes)):
|
||||||
|
assert await daemon.block_hex_hashes(first, count) == hashes
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_raw_blocks(daemon):
|
||||||
|
count = 3
|
||||||
|
hex_hashes = [f'hex_hash{n}' for n in range(count)]
|
||||||
|
args_list = [[hex_hash, False] for hex_hash in hex_hashes]
|
||||||
|
iterable = (hex_hash for hex_hash in hex_hashes)
|
||||||
|
blocks = ["00", "019a", "02fe"]
|
||||||
|
blocks_raw = [bytes.fromhex(block) for block in blocks]
|
||||||
|
with ClientSessionGood(('getblock', args_list, blocks)):
|
||||||
|
assert await daemon.raw_blocks(iterable) == blocks_raw
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_raw_transactions(daemon):
|
||||||
|
hex_hashes = ['deadbeef0', 'deadbeef1']
|
||||||
|
args_list = [[hex_hash, 0] for hex_hash in hex_hashes]
|
||||||
|
raw_txs_hex = ['fffefdfc', '0a0b0c0d']
|
||||||
|
raw_txs = [bytes.fromhex(raw_tx) for raw_tx in raw_txs_hex]
|
||||||
|
# Test 0 - old daemon's reject False
|
||||||
|
with ClientSessionGood(('getrawtransaction', args_list, raw_txs_hex)):
|
||||||
|
assert await daemon.getrawtransactions(hex_hashes) == raw_txs
|
||||||
|
|
||||||
|
# Test one error
|
||||||
|
tx_not_found = RPCError(-1, 'some error message')
|
||||||
|
results = ['ff0b7d', tx_not_found]
|
||||||
|
raw_txs = [bytes.fromhex(results[0]), None]
|
||||||
|
with ClientSessionGood(('getrawtransaction', args_list, results)):
|
||||||
|
assert await daemon.getrawtransactions(hex_hashes) == raw_txs
|
||||||
|
|
||||||
|
|
||||||
|
# Other tests
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_bad_auth(daemon, caplog):
|
||||||
|
with pytest.raises(DaemonError) as e:
|
||||||
|
with ClientSessionBadAuth():
|
||||||
|
await daemon.height()
|
||||||
|
|
||||||
|
assert "Unauthorized" in e.value.args[0]
|
||||||
|
assert in_caplog(caplog, "Unauthorized")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_workqueue_depth(daemon, caplog):
|
||||||
|
daemon.init_retry = 0.01
|
||||||
|
height = 125
|
||||||
|
with caplog.at_level(logging.INFO):
|
||||||
|
with ClientSessionWorkQueueFull(('getblockcount', [], height)):
|
||||||
|
await daemon.height() == height
|
||||||
|
|
||||||
|
assert in_caplog(caplog, "work queue full")
|
||||||
|
assert in_caplog(caplog, "running normally")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_connection_error(daemon, caplog):
|
||||||
|
height = 100
|
||||||
|
daemon.init_retry = 0.01
|
||||||
|
with caplog.at_level(logging.INFO):
|
||||||
|
with ClientSessionNoConnection(('getblockcount', [], height)):
|
||||||
|
await daemon.height() == height
|
||||||
|
|
||||||
|
assert in_caplog(caplog, "connection problem - is your daemon running?")
|
||||||
|
assert in_caplog(caplog, "connection restored")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_timeout_error(daemon, caplog):
|
||||||
|
height = 100
|
||||||
|
daemon.init_retry = 0.01
|
||||||
|
with caplog.at_level(logging.INFO):
|
||||||
|
with ClientSessionPostError(asyncio.TimeoutError,
|
||||||
|
('getblockcount', [], height)):
|
||||||
|
await daemon.height() == height
|
||||||
|
|
||||||
|
assert in_caplog(caplog, "timeout error")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_disconnected(daemon, caplog):
|
||||||
|
height = 100
|
||||||
|
daemon.init_retry = 0.01
|
||||||
|
with caplog.at_level(logging.INFO):
|
||||||
|
with ClientSessionPostError(aiohttp.ServerDisconnectedError,
|
||||||
|
('getblockcount', [], height)):
|
||||||
|
await daemon.height() == height
|
||||||
|
|
||||||
|
assert in_caplog(caplog, "disconnected")
|
||||||
|
assert in_caplog(caplog, "connection restored")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_warming_up(daemon, caplog):
|
||||||
|
warming_up = RPCError(-28, 'reading block index')
|
||||||
|
height = 100
|
||||||
|
daemon.init_retry = 0.01
|
||||||
|
with caplog.at_level(logging.INFO):
|
||||||
|
with ClientSessionGood(
|
||||||
|
('getblockcount', [], warming_up),
|
||||||
|
('getblockcount', [], height)
|
||||||
|
):
|
||||||
|
assert await daemon.height() == height
|
||||||
|
|
||||||
|
assert in_caplog(caplog, "starting up checking blocks")
|
||||||
|
assert in_caplog(caplog, "running normally")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_warming_up_batch(daemon, caplog):
|
||||||
|
warming_up = RPCError(-28, 'reading block index')
|
||||||
|
first = 5
|
||||||
|
count = 1
|
||||||
|
daemon.init_retry = 0.01
|
||||||
|
hashes = ['hex_hash5']
|
||||||
|
with caplog.at_level(logging.INFO):
|
||||||
|
with ClientSessionGood(('getblockhash', [[first]], [warming_up]),
|
||||||
|
('getblockhash', [[first]], hashes)):
|
||||||
|
assert await daemon.block_hex_hashes(first, count) == hashes
|
||||||
|
|
||||||
|
assert in_caplog(caplog, "starting up checking blocks")
|
||||||
|
assert in_caplog(caplog, "running normally")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_failover(daemon, caplog):
|
||||||
|
height = 100
|
||||||
|
daemon.init_retry = 0.01
|
||||||
|
daemon.max_retry = 0.04
|
||||||
|
with caplog.at_level(logging.INFO):
|
||||||
|
with ClientSessionFailover(('getblockcount', [], height)):
|
||||||
|
await daemon.height() == height
|
||||||
|
|
||||||
|
assert in_caplog(caplog, "disconnected", 3)
|
||||||
|
assert in_caplog(caplog, "failing over")
|
||||||
|
assert in_caplog(caplog, "connection restored")
|
||||||
Loading…
Reference in New Issue
Block a user