diff --git a/server/daemon.py b/server/daemon.py index 7a886fe..4696d76 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -45,20 +45,55 @@ class Daemon(util.LoggedClass): .format(height)) self._height = height - async def post(self, data): - '''Send data to the daemon and handle the response.''' - async with self.workqueue_semaphore: - async with aiohttp.post(self.url, data=data) as resp: - result = await resp.json() + async def _send(self, payload, processor): + '''Send a payload to be converted to JSON. - if isinstance(result, list): - errs = [item['error'] for item in result] - if not any(errs): - return [item['result'] for item in result] - if any(err.get('code') == self.WARMING_UP for err in errs if err): - raise DaemonWarmingUpError - raise DaemonError(errs) - else: + Handles temporary connection issues. Daemon reponse errors + are raise through DaemonError. + ''' + prior_msg = None + skip_count = None + + def log_error(msg, skip_once=False): + if skip_once and skip_count is None: + skip_count = 1 + if msg != prior_msg or skip_count == 0: + skip_count = 10 + prior_msg = msg + self.logger.error('{}. Retrying between sleeps...' + .format(msg)) + skip_count -= 1 + + data = json.dumps(payload) + secs = 1 + while True: + try: + async with self.workqueue_semaphore: + async with aiohttp.post(self.url, data=data) as resp: + result = processor(await resp.json()) + if prior_msg: + self.logger.info('connection restored') + return result + except (asyncio.CancelledError, DaemonError): + raise + except asyncio.TimeoutError: + log_error('timeout error', skip_once=True) + except aiohttp.ClientHttpProcessingError: + log_error('HTTP error', skip_once=True) + except aiohttp.ServerDisconnectedError: + log_error('disconnected', skip_once=True) + except aiohttp.ClientConnectionError: + log_error('connection problem - is your daemon running?') + except DaemonWarmingUpError: + log_error('still starting up checking blocks...') + except Exception as e: + log_error('request gave unexpected error: {}'.format(e)) + await asyncio.sleep(secs) + secs = min(16, secs * 2) + + async def _send_single(self, method, params=None): + '''Send a single request to the daemon.''' + def processor(result): err = result['error'] if not err: return result['result'] @@ -66,73 +101,39 @@ class Daemon(util.LoggedClass): raise DaemonWarmingUpError raise DaemonError(err) - async def send(self, payload): - '''Send a payload to be converted to JSON. - - Handles temporary connection issues. Daemon reponse errors - are raise through DaemonError. - ''' - data = json.dumps(payload) - secs = 1 - prior_msg = None - while True: - try: - result = await self.post(data) - if prior_msg: - self.logger.info('connection successfully restored') - return result - except asyncio.TimeoutError: - msg = 'timeout error' - except aiohttp.ClientHttpProcessingError: - msg = 'HTTP error' - except aiohttp.ServerDisconnectedError: - msg = 'disconnected' - except aiohttp.ClientConnectionError: - msg = 'connection problem - is your daemon running?' - except DaemonWarmingUpError: - msg = 'still starting up checking blocks...' - except (asyncio.CancelledError, DaemonError): - raise - except Exception as e: - msg = ('request gave unexpected error: {}'.format(e)) - - if msg != prior_msg or count == 10: - self.logger.error('{}. Retrying between sleeps...' - .format(msg)) - prior_msg = msg - count = 0 - await asyncio.sleep(secs) - count += 1 - secs = min(16, secs * 2) - - async def send_single(self, method, params=None): - '''Send a single request to the daemon.''' payload = {'method': method} if params: payload['params'] = params - return await self.send(payload) + return await self._send(payload, processor) - async def send_many(self, mp_iterable): - '''Send several requests at once.''' - payload = [{'method': m, 'params': p} for m, p in mp_iterable] + async def _send_vector(self, method, params_iterable, replace_errs=False): + '''Send several requests of the same method. + + The result will be an array of the same length as params_iterable. + If replace_errs is true, any item with an error is returned as None, + othewise an exception is raised.''' + def processor(result): + errs = [item['error'] for item in result if item['error']] + if not errs or replace_errs: + return [item['result'] for item in result] + if any(err.get('code') == self.WARMING_UP for err in errs): + raise DaemonWarmingUpError + raise DaemonError(errs) + + payload = [{'method': method, 'params': p} for p in params_iterable] if payload: - return await self.send(payload) + return await self._send(payload, processor) return [] - async def send_vector(self, method, params_iterable): - '''Send several requests of the same method.''' - return await self.send_many((method, params) - for params in params_iterable) - async def block_hex_hashes(self, first, count): '''Return the hex hashes of count block starting at height first.''' params_iterable = ((h, ) for h in range(first, first + count)) - return await self.send_vector('getblockhash', params_iterable) + return await self._send_vector('getblockhash', params_iterable) async def raw_blocks(self, hex_hashes): '''Return the raw binary blocks with the given hex hashes.''' params_iterable = ((h, False) for h in hex_hashes) - blocks = await self.send_vector('getblock', params_iterable) + blocks = await self._send_vector('getblock', params_iterable) # Convert hex string to bytes return [bytes.fromhex(block) for block in blocks] @@ -140,39 +141,40 @@ class Daemon(util.LoggedClass): '''Return the hashes of the txs in the daemon's mempool.''' if self.debug_caught_up: return [] - return await self.send_single('getrawmempool') + return await self._send_single('getrawmempool') async def estimatefee(self, params): '''Return the fee estimate for the given parameters.''' - return await self.send_single('estimatefee', params) + return await self._send_single('estimatefee', params) async def relayfee(self): '''The minimum fee a low-priority tx must pay in order to be accepted to the daemon's memory pool.''' - net_info = await self.send_single('getnetworkinfo') + net_info = await self._send_single('getnetworkinfo') return net_info['relayfee'] async def getrawtransaction(self, hex_hash): '''Return the serialized raw transaction with the given hash.''' - return await self.send_single('getrawtransaction', (hex_hash, 0)) + return await self._send_single('getrawtransaction', (hex_hash, 0)) - async def getrawtransactions(self, hex_hashes): + async def getrawtransactions(self, hex_hashes, replace_errs=True): '''Return the serialized raw transactions with the given hashes. - Breaks large requests up. Yields after each sub request.''' + Replaces errors with None by default.''' params_iterable = ((hex_hash, 0) for hex_hash in hex_hashes) - txs = await self.send_vector('getrawtransaction', params_iterable) + txs = await self._send_vector('getrawtransaction', params_iterable, + replace_errs=replace_errs) # Convert hex strings to bytes - return [bytes.fromhex(tx) for tx in txs] + return [bytes.fromhex(tx) if tx else None for tx in txs] async def sendrawtransaction(self, params): '''Broadcast a transaction to the network.''' - return await self.send_single('sendrawtransaction', params) + return await self._send_single('sendrawtransaction', params) async def height(self): '''Query the daemon for its current height.''' if not self.debug_caught_up: - self._height = await self.send_single('getblockcount') + self._height = await self._send_single('getblockcount') return self._height def cached_height(self):