diff --git a/electrumx/lib/coins.py b/electrumx/lib/coins.py index 905b423..2cce41d 100644 --- a/electrumx/lib/coins.py +++ b/electrumx/lib/coins.py @@ -111,10 +111,6 @@ class Coin(object): url = 'http://' + url return url + '/' - @classmethod - def daemon_urls(cls, urls): - return [cls.sanitize_url(url) for url in urls.split(',')] - @classmethod def genesis_block(cls, block): '''Check the Genesis block is the right one for this coin. diff --git a/electrumx/lib/server_base.py b/electrumx/lib/server_base.py index db7a213..f3688d6 100644 --- a/electrumx/lib/server_base.py +++ b/electrumx/lib/server_base.py @@ -13,7 +13,7 @@ import sys import time from functools import partial -from aiorpcx import TaskGroup +from aiorpcx import spawn from electrumx.lib.util import class_logger @@ -93,12 +93,11 @@ class ServerBase(object): loop.set_exception_handler(self.on_exception) shutdown_event = asyncio.Event() - async with TaskGroup() as group: - server_task = await group.spawn(self.serve(shutdown_event)) - # Wait for shutdown, log on receipt of the event - await shutdown_event.wait() - self.logger.info('shutting down') - server_task.cancel() + server_task = await spawn(self.serve(shutdown_event)) + # Wait for shutdown, log on receipt of the event + await shutdown_event.wait() + self.logger.info('shutting down') + server_task.cancel() # Prevent some silly logs await asyncio.sleep(0.01) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index bea879e..e658fe5 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -650,10 +650,7 @@ class BlockProcessor(object): could be lost. ''' self._caught_up_event = caught_up_event - async with TaskGroup() as group: - await group.spawn(self._first_open_dbs()) - # Ensure cached_height is set - await group.spawn(self.daemon.height()) + await self._first_open_dbs() try: async with TaskGroup() as group: await group.spawn(self.prefetcher.main_loop(self.height)) diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 5b156c2..09d022b 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -92,9 +92,11 @@ class Controller(ServerBase): self.logger.info(f'reorg limit is {env.reorg_limit:,d} blocks') notifications = Notifications() - daemon = env.coin.DAEMON(env) - db = DB(env) + Daemon = env.coin.DAEMON BlockProcessor = env.coin.BLOCK_PROCESSOR + + daemon = Daemon(env.coin, env.daemon_url) + db = DB(env) bp = BlockProcessor(env, db, daemon, notifications) # Set ourselves up to implement the MemPoolAPI @@ -110,10 +112,13 @@ class Controller(ServerBase): session_mgr = SessionManager(env, db, bp, daemon, mempool, notifications, shutdown_event) + # Test daemon authentication, and also ensure it has a cached + # height. Do this before entering the task group. + await daemon.height() + caught_up_event = Event() serve_externally_event = Event() synchronized_event = Event() - async with TaskGroup() as group: await group.spawn(session_mgr.serve(serve_externally_event)) await group.spawn(bp.fetch_and_process_blocks(caught_up_event)) diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index be22e99..c21bc5a 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -9,6 +9,7 @@ daemon.''' import asyncio +import itertools import json import time from calendar import timegm @@ -28,48 +29,53 @@ class DaemonError(Exception): '''Raised when the daemon returns an error in its results.''' +class WarmingUpError(Exception): + '''Internal - when the daemon is warming up.''' + + +class WorkQueueFullError(Exception): + '''Internal - when the daemon's work queue is full.''' + + class Daemon(object): '''Handles connections to a daemon at the given URL.''' WARMING_UP = -28 - RPC_MISC_ERROR = -1 + id_counter = itertools.count() - class DaemonWarmingUpError(Exception): - '''Raised when the daemon returns an error in its results.''' - - def __init__(self, env): + def __init__(self, coin, url, max_workqueue=10, init_retry=0.25, + max_retry=4.0): + self.coin = coin self.logger = class_logger(__name__, self.__class__.__name__) - self.coin = env.coin - self.set_urls(env.coin.daemon_urls(env.daemon_url)) - self._height = None + self.set_url(url) # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 - self.workqueue_semaphore = asyncio.Semaphore(value=10) - self.down = False - self.last_error_time = 0 - self.req_id = 0 - self._available_rpcs = {} # caches results for _is_rpc_available() + self.workqueue_semaphore = asyncio.Semaphore(value=max_workqueue) + self.init_retry = init_retry + self.max_retry = max_retry + self._height = None + self.available_rpcs = {} - def next_req_id(self): - '''Retrns the next request ID.''' - self.req_id += 1 - return self.req_id - - def set_urls(self, urls): + def set_url(self, url): '''Set the URLS to the given list, and switch to the first one.''' - if not urls: - raise DaemonError('no daemon URLs provided') - self.urls = urls - self.url_index = 0 + urls = url.split(',') + urls = [self.coin.sanitize_url(url) for url in urls] for n, url in enumerate(urls): - self.logger.info('daemon #{:d} at {}{}' - .format(n + 1, self.logged_url(url), - '' if n else ' (current)')) + status = '' if n else ' (current)' + logged_url = self.logged_url(url) + self.logger.info(f'daemon #{n + 1} at {logged_url}{status}') + self.url_index = 0 + self.urls = urls - def url(self): + def current_url(self): '''Returns the current daemon URL.''' return self.urls[self.url_index] + def logged_url(self, url=None): + '''The host and port part, for logging.''' + url = url or self.current_url() + return url[url.rindex('@') + 1:] + def failover(self): '''Call to fail-over to the next daemon URL. @@ -77,7 +83,7 @@ class Daemon(object): ''' if len(self.urls) > 1: self.url_index = (self.url_index + 1) % len(self.urls) - self.logger.info('failing over to {}'.format(self.logged_url())) + self.logger.info(f'failing over to {self.logged_url()}') return True return False @@ -88,13 +94,17 @@ class Daemon(object): async def _send_data(self, data): async with self.workqueue_semaphore: async with self.client_session() as session: - async with session.post(self.url(), data=data) as resp: - # If bitcoind can't find a tx, for some reason - # it returns 500 but fills out the JSON. - # Should still return 200 IMO. - if resp.status in (200, 404, 500): + async with session.post(self.current_url(), data=data) as resp: + kind = resp.headers.get('Content-Type', None) + if kind == 'application/json': return await resp.json() - return (resp.status, resp.reason) + # bitcoind's HTTP protocol "handling" is a bad joke + text = await resp.text() + if 'Work queue depth exceeded' in text: + raise WorkQueueFullError + text = text.strip() or resp.reason + self.logger.error(text) + raise DaemonError(text) async def _send(self, payload, processor): '''Send a payload to be converted to JSON. @@ -103,54 +113,42 @@ class Daemon(object): are raise through DaemonError. ''' def log_error(error): - self.down = True + nonlocal last_error_log, retry now = time.time() - prior_time = self.last_error_time - if now - prior_time > 60: - self.last_error_time = now - if prior_time and self.failover(): - secs = 0 - else: - self.logger.error('{} Retrying occasionally...' - .format(error)) + if now - last_error_log > 60: + last_error_time = now + self.logger.error(f'{error} Retrying occasionally...') + if retry == self.max_retry and self.failover(): + retry = 0 + on_good_message = None + last_error_log = 0 data = json.dumps(payload) - secs = 1 - max_secs = 4 + retry = self.init_retry while True: try: result = await self._send_data(data) - if not isinstance(result, tuple): - result = processor(result) - if self.down: - self.down = False - self.last_error_time = 0 - self.logger.info('connection restored') - return result - log_error('HTTP error code {:d}: {}' - .format(result[0], result[1])) + result = processor(result) + if on_good_message: + self.logger.info(on_good_message) + return result except asyncio.TimeoutError: log_error('timeout error.') except aiohttp.ServerDisconnectedError: log_error('disconnected.') - except aiohttp.ClientPayloadError: - log_error('payload encoding error.') + on_good_message = 'connection restored' except aiohttp.ClientConnectionError: log_error('connection problem - is your daemon running?') - except self.DaemonWarmingUpError: + on_good_message = 'connection restored' + except WarmingUpError: log_error('starting up checking blocks.') - except (asyncio.CancelledError, DaemonError): - raise - except Exception as e: - self.logger.exception(f'uncaught exception: {e}') + on_good_message = 'running normally' + except WorkQueueFullError: + log_error('work queue full.') + on_good_message = 'running normally' - await asyncio.sleep(secs) - secs = min(max_secs, secs * 2, 1) - - def logged_url(self, url=None): - '''The host and port part, for logging.''' - url = url or self.url() - return url[url.rindex('@') + 1:] + await asyncio.sleep(retry) + retry = max(min(self.max_retry, retry * 2), self.init_retry) async def _send_single(self, method, params=None): '''Send a single request to the daemon.''' @@ -159,10 +157,10 @@ class Daemon(object): if not err: return result['result'] if err.get('code') == self.WARMING_UP: - raise self.DaemonWarmingUpError + raise WarmingUpError raise DaemonError(err) - payload = {'method': method, 'id': self.next_req_id()} + payload = {'method': method, 'id': next(self.id_counter)} if params: payload['params'] = params return await self._send(payload, processor) @@ -176,12 +174,12 @@ class Daemon(object): def processor(result): errs = [item['error'] for item in result if item['error']] if any(err.get('code') == self.WARMING_UP for err in errs): - raise self.DaemonWarmingUpError + raise WarmingUpError if not errs or replace_errs: return [item['result'] for item in result] raise DaemonError(errs) - payload = [{'method': method, 'params': p, 'id': self.next_req_id()} + payload = [{'method': method, 'params': p, 'id': next(self.id_counter)} for p in params_iterable] if payload: return await self._send(payload, processor) @@ -192,27 +190,16 @@ class Daemon(object): Results are cached and the daemon will generally not be queried with the same method more than once.''' - available = self._available_rpcs.get(method, None) + available = self.available_rpcs.get(method) if available is None: + available = True try: await self._send_single(method) - available = True except DaemonError as e: err = e.args[0] error_code = err.get("code") - if error_code == JSONRPC.METHOD_NOT_FOUND: - available = False - elif error_code == self.RPC_MISC_ERROR: - # method found but exception was thrown in command handling - # probably because we did not provide arguments - available = True - else: - self.logger.warning('error (code {:d}: {}) when testing ' - 'RPC availability of method {}' - .format(error_code, err.get("message"), - method)) - available = False - self._available_rpcs[method] = available + available = error_code != JSONRPC.METHOD_NOT_FOUND + self.available_rpcs[method] = available return available async def block_hex_hashes(self, first, count): @@ -235,12 +222,16 @@ class Daemon(object): '''Update our record of the daemon's mempool hashes.''' return await self._send_single('getrawmempool') - async def estimatefee(self, params): - '''Return the fee estimate for the given parameters.''' + async def estimatefee(self, block_count): + '''Return the fee estimate for the block count. Units are whole + currency units per KB, e.g. 0.00000995, or -1 if no estimate + is available. + ''' + args = (block_count, ) if await self._is_rpc_available('estimatesmartfee'): - estimate = await self._send_single('estimatesmartfee', params) + estimate = await self._send_single('estimatesmartfee', args) return estimate.get('feerate', -1) - return await self._send_single('estimatefee', params) + return await self._send_single('estimatefee', args) async def getnetworkinfo(self): '''Return the result of the 'getnetworkinfo' RPC call.''' @@ -268,9 +259,9 @@ class Daemon(object): # Convert hex strings to bytes return [hex_to_bytes(tx) if tx else None for tx in txs] - async def sendrawtransaction(self, params): + async def broadcast_transaction(self, raw_tx): '''Broadcast a transaction to the network.''' - return await self._send_single('sendrawtransaction', params) + return await self._send_single('sendrawtransaction', (raw_tx, )) async def height(self): '''Query the daemon for its current height.''' @@ -299,7 +290,7 @@ class FakeEstimateFeeDaemon(Daemon): '''Daemon that simulates estimatefee and relayfee RPC calls. Coin that wants to use this daemon must define ESTIMATE_FEE & RELAY_FEE''' - async def estimatefee(self, params): + async def estimatefee(self, block_count): '''Return the fee estimate for the given parameters.''' return self.coin.ESTIMATE_FEE diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 37290cf..c379f45 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -383,7 +383,7 @@ class SessionManager(object): '''Replace the daemon URL.''' daemon_url = daemon_url or self.env.daemon_url try: - self.daemon.set_urls(self.env.coin.daemon_urls(daemon_url)) + self.daemon.set_url(daemon_url) except Exception as e: raise RPCError(BAD_REQUEST, f'an error occured: {e!r}') return f'now using daemon at {self.daemon.logged_url()}' @@ -535,7 +535,7 @@ class SessionManager(object): return electrum_header async def broadcast_transaction(self, raw_tx): - hex_hash = await self.daemon.sendrawtransaction([raw_tx]) + hex_hash = await self.daemon.broadcast_transaction(raw_tx) self.txs_sent += 1 return hex_hash @@ -1088,7 +1088,7 @@ class ElectrumX(SessionBase): number: the number of blocks ''' number = non_negative_integer(number) - return await self.daemon_request('estimatefee', [number]) + return await self.daemon_request('estimatefee', number) async def ping(self): '''Serves as a connection keep-alive mechanism and for the client to @@ -1144,7 +1144,7 @@ class ElectrumX(SessionBase): except DaemonError as e: error, = e.args message = error['message'] - self.logger.info(f'sendrawtransaction: {message}') + self.logger.info(f'error sending transaction: {message}') raise RPCError(BAD_REQUEST, 'the transaction was rejected by ' f'network rules.\n\n{message}\n[{raw_tx}]') diff --git a/tests/server/test_daemon.py b/tests/server/test_daemon.py new file mode 100644 index 0000000..2a905a9 --- /dev/null +++ b/tests/server/test_daemon.py @@ -0,0 +1,489 @@ +import aiohttp +import asyncio +import json +import logging + +import pytest + +from aiorpcx import ( + JSONRPCv1, JSONRPCLoose, RPCError, ignore_after, + Request, Batch, +) +from electrumx.lib.coins import BitcoinCash, CoinError, Bitzeny +from electrumx.server.daemon import ( + Daemon, FakeEstimateFeeDaemon, DaemonError +) + + +coin = BitcoinCash + +# These should be full, canonical URLs +urls = ['http://rpc_user:rpc_pass@127.0.0.1:8332/', + 'http://rpc_user:rpc_pass@192.168.0.1:8332/'] + + +@pytest.fixture(params=[BitcoinCash, Bitzeny]) +def daemon(request): + coin = request.param + return coin.DAEMON(coin, ','.join(urls)) + + +class ResponseBase(object): + + def __init__(self, headers, status): + self.headers = headers + self.status = status + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + pass + + +class JSONResponse(ResponseBase): + + def __init__(self, result, msg_id, status=200): + super().__init__({'Content-Type': 'application/json'}, status) + self.result = result + self.msg_id = msg_id + + async def json(self): + if isinstance(self.msg_id, int): + message = JSONRPCv1.response_message(self.result, self.msg_id) + else: + parts = [JSONRPCv1.response_message(item, msg_id) + for item, msg_id in zip(self.result, self.msg_id)] + message = JSONRPCv1.batch_message_from_parts(parts) + return json.loads(message.decode()) + + +class HTMLResponse(ResponseBase): + + def __init__(self, text, reason, status): + super().__init__({'Content-Type': 'text/html; charset=ISO-8859-1'}, + status) + self._text = text + self.reason = reason + + async def text(self): + return self._text + + +class ClientSessionBase(object): + + def __enter__(self): + self.prior_class = aiohttp.ClientSession + aiohttp.ClientSession = lambda: self + + def __exit__(self, exc_type, exc_value, traceback): + aiohttp.ClientSession = self.prior_class + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + pass + + +class ClientSessionGood(ClientSessionBase): + '''Imitate aiohttp for testing purposes.''' + + def __init__(self, *triples): + self.triples = triples # each a (method, args, result) + self.count = 0 + self.expected_url = urls[0] + + def post(self, url, data=""): + assert url == self.expected_url + request, request_id = JSONRPCLoose.message_to_item(data.encode()) + method, args, result = self.triples[self.count] + self.count += 1 + if isinstance(request, Request): + assert request.method == method + assert request.args == args + return JSONResponse(result, request_id) + else: + assert isinstance(request, Batch) + for request, args in zip(request, args): + assert request.method == method + assert request.args == args + return JSONResponse(result, request_id) + + +class ClientSessionBadAuth(ClientSessionBase): + + def post(self, url, data=""): + return HTMLResponse('', 'Unauthorized', 401) + + +class ClientSessionWorkQueueFull(ClientSessionGood): + + def post(self, url, data=""): + self.post = super().post + return HTMLResponse('Work queue depth exceeded', + 'Internal server error', 500) + + +class ClientSessionNoConnection(ClientSessionGood): + + def __init__(self, *args): + self.args = args + + async def __aenter__(self): + aiohttp.ClientSession = lambda: ClientSessionGood(*self.args) + raise aiohttp.ClientConnectionError + + +class ClientSessionPostError(ClientSessionGood): + + def __init__(self, exception, *args): + self.exception = exception + self.args = args + + def post(self, url, data=""): + aiohttp.ClientSession = lambda: ClientSessionGood(*self.args) + raise self.exception + + +class ClientSessionFailover(ClientSessionGood): + + def post(self, url, data=""): + # If not failed over; simulate disconnecting + if url == self.expected_url: + raise aiohttp.ServerDisconnectedError + else: + self.expected_url = urls[1] + return super().post(url, data) + + +def in_caplog(caplog, message, count=1): + return sum(message in record.message + for record in caplog.records) == count + +# +# Tests +# + +def test_set_urls_bad(): + with pytest.raises(CoinError): + Daemon(coin, '') + with pytest.raises(CoinError): + Daemon(coin, 'a') + + +def test_set_urls_one(caplog): + with caplog.at_level(logging.INFO): + daemon = Daemon(coin, urls[0]) + assert daemon.current_url() == urls[0] + assert len(daemon.urls) == 1 + logged_url = daemon.logged_url() + assert logged_url == '127.0.0.1:8332/' + assert in_caplog(caplog, f'daemon #1 at {logged_url} (current)') + + +def test_set_urls_two(caplog): + with caplog.at_level(logging.INFO): + daemon = Daemon(coin, ','.join(urls)) + assert daemon.current_url() == urls[0] + assert len(daemon.urls) == 2 + logged_url = daemon.logged_url() + assert logged_url == '127.0.0.1:8332/' + assert in_caplog(caplog, f'daemon #1 at {logged_url} (current)') + assert in_caplog(caplog, 'daemon #2 at 192.168.0.1:8332') + + +def test_set_urls_short(): + no_prefix_urls = ['/'.join(part for part in url.split('/')[2:]) + for url in urls] + daemon = Daemon(coin, ','.join(no_prefix_urls)) + assert daemon.current_url() == urls[0] + assert len(daemon.urls) == 2 + + no_slash_urls = [url[:-1] for url in urls] + daemon = Daemon(coin, ','.join(no_slash_urls)) + assert daemon.current_url() == urls[0] + assert len(daemon.urls) == 2 + + no_port_urls = [url[:url.rfind(':')] for url in urls] + daemon = Daemon(coin, ','.join(no_port_urls)) + assert daemon.current_url() == urls[0] + assert len(daemon.urls) == 2 + + +def test_failover_good(caplog): + daemon = Daemon(coin, ','.join(urls)) + with caplog.at_level(logging.INFO): + result = daemon.failover() + assert result is True + assert daemon.current_url() == urls[1] + logged_url = daemon.logged_url() + assert in_caplog(caplog, f'failing over to {logged_url}') + # And again + result = daemon.failover() + assert result is True + assert daemon.current_url() == urls[0] + + +def test_failover_fail(caplog): + daemon = Daemon(coin, urls[0]) + with caplog.at_level(logging.INFO): + result = daemon.failover() + assert result is False + assert daemon.current_url() == urls[0] + assert not in_caplog(caplog, f'failing over') + + +@pytest.mark.asyncio +async def test_height(daemon): + assert daemon.cached_height() is None + height = 300 + with ClientSessionGood(('getblockcount', [], height)): + assert await daemon.height() == height + assert daemon.cached_height() == height + + +@pytest.mark.asyncio +async def test_broadcast_transaction(daemon): + raw_tx = 'deadbeef' + tx_hash = 'hash' + with ClientSessionGood(('sendrawtransaction', [raw_tx], tx_hash)): + assert await daemon.broadcast_transaction(raw_tx) == tx_hash + + +@pytest.mark.asyncio +async def test_relayfee(daemon): + response = {"relayfee": sats, "other:": "cruft"} + with ClientSessionGood(('getnetworkinfo', [], response)): + assert await daemon.getnetworkinfo() == response + + +@pytest.mark.asyncio +async def test_relayfee(daemon): + if isinstance(daemon, FakeEstimateFeeDaemon): + sats = daemon.coin.ESTIMATE_FEE + else: + sats = 2 + response = {"relayfee": sats, "other:": "cruft"} + with ClientSessionGood(('getnetworkinfo', [], response)): + assert await daemon.relayfee() == sats + + +@pytest.mark.asyncio +async def test_mempool_hashes(daemon): + hashes = ['hex_hash1', 'hex_hash2'] + with ClientSessionGood(('getrawmempool', [], hashes)): + assert await daemon.mempool_hashes() == hashes + + +@pytest.mark.asyncio +async def test_deserialised_block(daemon): + block_hash = 'block_hash' + result = {'some': 'mess'} + with ClientSessionGood(('getblock', [block_hash, True], result)): + assert await daemon.deserialised_block(block_hash) == result + + +@pytest.mark.asyncio +async def test_estimatefee(daemon): + method_not_found = RPCError(JSONRPCv1.METHOD_NOT_FOUND, 'nope') + if isinstance(daemon, FakeEstimateFeeDaemon): + result = daemon.coin.ESTIMATE_FEE + else: + result = -1 + with ClientSessionGood( + ('estimatesmartfee', [], method_not_found), + ('estimatefee', [2], result) + ): + assert await daemon.estimatefee(2) == result + + +@pytest.mark.asyncio +async def test_estimatefee_smart(daemon): + bad_args = RPCError(JSONRPCv1.INVALID_ARGS, 'bad args') + if isinstance(daemon, FakeEstimateFeeDaemon): + return + rate = 0.0002 + result = {'feerate': rate} + with ClientSessionGood( + ('estimatesmartfee', [], bad_args), + ('estimatesmartfee', [2], result) + ): + assert await daemon.estimatefee(2) == rate + + # Test the rpc_available_cache is used + with ClientSessionGood(('estimatesmartfee', [2], result)): + assert await daemon.estimatefee(2) == rate + + +@pytest.mark.asyncio +async def test_getrawtransaction(daemon): + hex_hash = 'deadbeef' + simple = 'tx_in_hex' + verbose = {'hex': hex_hash, 'other': 'cruft'} + # Test False is converted to 0 - old daemon's reject False + with ClientSessionGood(('getrawtransaction', [hex_hash, 0], simple)): + assert await daemon.getrawtransaction(hex_hash) == simple + + # Test True is converted to 1 + with ClientSessionGood(('getrawtransaction', [hex_hash, 1], verbose)): + assert await daemon.getrawtransaction( + hex_hash, True) == verbose + + +# Batch tests + +@pytest.mark.asyncio +async def test_empty_send(daemon): + first = 5 + count = 0 + with ClientSessionGood(('getblockhash', [], [])): + assert await daemon.block_hex_hashes(first, count) == [] + + +@pytest.mark.asyncio +async def test_block_hex_hashes(daemon): + first = 5 + count = 3 + hashes = [f'hex_hash{n}' for n in range(count)] + with ClientSessionGood(('getblockhash', + [[n] for n in range(first, first + count)], + hashes)): + assert await daemon.block_hex_hashes(first, count) == hashes + + +@pytest.mark.asyncio +async def test_raw_blocks(daemon): + count = 3 + hex_hashes = [f'hex_hash{n}' for n in range(count)] + args_list = [[hex_hash, False] for hex_hash in hex_hashes] + iterable = (hex_hash for hex_hash in hex_hashes) + blocks = ["00", "019a", "02fe"] + blocks_raw = [bytes.fromhex(block) for block in blocks] + with ClientSessionGood(('getblock', args_list, blocks)): + assert await daemon.raw_blocks(iterable) == blocks_raw + + +@pytest.mark.asyncio +async def test_get_raw_transactions(daemon): + hex_hashes = ['deadbeef0', 'deadbeef1'] + args_list = [[hex_hash, 0] for hex_hash in hex_hashes] + raw_txs_hex = ['fffefdfc', '0a0b0c0d'] + raw_txs = [bytes.fromhex(raw_tx) for raw_tx in raw_txs_hex] + # Test 0 - old daemon's reject False + with ClientSessionGood(('getrawtransaction', args_list, raw_txs_hex)): + assert await daemon.getrawtransactions(hex_hashes) == raw_txs + + # Test one error + tx_not_found = RPCError(-1, 'some error message') + results = ['ff0b7d', tx_not_found] + raw_txs = [bytes.fromhex(results[0]), None] + with ClientSessionGood(('getrawtransaction', args_list, results)): + assert await daemon.getrawtransactions(hex_hashes) == raw_txs + + +# Other tests + +@pytest.mark.asyncio +async def test_bad_auth(daemon, caplog): + with pytest.raises(DaemonError) as e: + with ClientSessionBadAuth(): + await daemon.height() + + assert "Unauthorized" in e.value.args[0] + assert in_caplog(caplog, "Unauthorized") + + +@pytest.mark.asyncio +async def test_workqueue_depth(daemon, caplog): + daemon.init_retry = 0.01 + height = 125 + with caplog.at_level(logging.INFO): + with ClientSessionWorkQueueFull(('getblockcount', [], height)): + await daemon.height() == height + + assert in_caplog(caplog, "work queue full") + assert in_caplog(caplog, "running normally") + + +@pytest.mark.asyncio +async def test_connection_error(daemon, caplog): + height = 100 + daemon.init_retry = 0.01 + with caplog.at_level(logging.INFO): + with ClientSessionNoConnection(('getblockcount', [], height)): + await daemon.height() == height + + assert in_caplog(caplog, "connection problem - is your daemon running?") + assert in_caplog(caplog, "connection restored") + + +@pytest.mark.asyncio +async def test_timeout_error(daemon, caplog): + height = 100 + daemon.init_retry = 0.01 + with caplog.at_level(logging.INFO): + with ClientSessionPostError(asyncio.TimeoutError, + ('getblockcount', [], height)): + await daemon.height() == height + + assert in_caplog(caplog, "timeout error") + + +@pytest.mark.asyncio +async def test_disconnected(daemon, caplog): + height = 100 + daemon.init_retry = 0.01 + with caplog.at_level(logging.INFO): + with ClientSessionPostError(aiohttp.ServerDisconnectedError, + ('getblockcount', [], height)): + await daemon.height() == height + + assert in_caplog(caplog, "disconnected") + assert in_caplog(caplog, "connection restored") + + +@pytest.mark.asyncio +async def test_warming_up(daemon, caplog): + warming_up = RPCError(-28, 'reading block index') + height = 100 + daemon.init_retry = 0.01 + with caplog.at_level(logging.INFO): + with ClientSessionGood( + ('getblockcount', [], warming_up), + ('getblockcount', [], height) + ): + assert await daemon.height() == height + + assert in_caplog(caplog, "starting up checking blocks") + assert in_caplog(caplog, "running normally") + + +@pytest.mark.asyncio +async def test_warming_up_batch(daemon, caplog): + warming_up = RPCError(-28, 'reading block index') + first = 5 + count = 1 + daemon.init_retry = 0.01 + hashes = ['hex_hash5'] + with caplog.at_level(logging.INFO): + with ClientSessionGood(('getblockhash', [[first]], [warming_up]), + ('getblockhash', [[first]], hashes)): + assert await daemon.block_hex_hashes(first, count) == hashes + + assert in_caplog(caplog, "starting up checking blocks") + assert in_caplog(caplog, "running normally") + + +@pytest.mark.asyncio +async def test_failover(daemon, caplog): + height = 100 + daemon.init_retry = 0.01 + daemon.max_retry = 0.04 + with caplog.at_level(logging.INFO): + with ClientSessionFailover(('getblockcount', [], height)): + await daemon.height() == height + + assert in_caplog(caplog, "disconnected", 3) + assert in_caplog(caplog, "failing over") + assert in_caplog(caplog, "connection restored")