diff --git a/server/daemon.py b/server/daemon.py index c4a3cc6..4696d76 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -45,20 +45,55 @@ class Daemon(util.LoggedClass): .format(height)) self._height = height - async def _post(self, data): - '''Send data to the daemon and handle the response.''' - async with self.workqueue_semaphore: - async with aiohttp.post(self.url, data=data) as resp: - result = await resp.json() + async def _send(self, payload, processor): + '''Send a payload to be converted to JSON. - if isinstance(result, list): - errs = [item['error'] for item in result] - if not any(errs): - return [item['result'] for item in result] - if any(err.get('code') == self.WARMING_UP for err in errs if err): - raise DaemonWarmingUpError - raise DaemonError(errs) - else: + Handles temporary connection issues. Daemon reponse errors + are raise through DaemonError. + ''' + prior_msg = None + skip_count = None + + def log_error(msg, skip_once=False): + if skip_once and skip_count is None: + skip_count = 1 + if msg != prior_msg or skip_count == 0: + skip_count = 10 + prior_msg = msg + self.logger.error('{}. Retrying between sleeps...' + .format(msg)) + skip_count -= 1 + + data = json.dumps(payload) + secs = 1 + while True: + try: + async with self.workqueue_semaphore: + async with aiohttp.post(self.url, data=data) as resp: + result = processor(await resp.json()) + if prior_msg: + self.logger.info('connection restored') + return result + except (asyncio.CancelledError, DaemonError): + raise + except asyncio.TimeoutError: + log_error('timeout error', skip_once=True) + except aiohttp.ClientHttpProcessingError: + log_error('HTTP error', skip_once=True) + except aiohttp.ServerDisconnectedError: + log_error('disconnected', skip_once=True) + except aiohttp.ClientConnectionError: + log_error('connection problem - is your daemon running?') + except DaemonWarmingUpError: + log_error('still starting up checking blocks...') + except Exception as e: + log_error('request gave unexpected error: {}'.format(e)) + await asyncio.sleep(secs) + secs = min(16, secs * 2) + + async def _send_single(self, method, params=None): + '''Send a single request to the daemon.''' + def processor(result): err = result['error'] if not err: return result['result'] @@ -66,62 +101,28 @@ class Daemon(util.LoggedClass): raise DaemonWarmingUpError raise DaemonError(err) - async def _send(self, payload): - '''Send a payload to be converted to JSON. - - Handles temporary connection issues. Daemon reponse errors - are raise through DaemonError. - ''' - secs = 1 - prior_msg = None - skip_count = 10 - - async def sleep(msg, *, skip_once=False): - skip_count -= 1 - if skip_once and msg != prior_msg: - skip_count = 1 - elif msg != prior_msg or skip_count == 0: - self.logger.error('{}. Retrying between sleeps...' - .format(msg)) - skip_count = 10 - prior_msg = msg - await asyncio.sleep(secs) - secs = min(16, secs * 2) - - data = json.dumps(payload) - while True: - try: - result = await self._post(data) - if prior_msg: - self.logger.info('connection successfully restored') - return result - except (asyncio.CancelledError, DaemonError): - raise - except asyncio.TimeoutError: - sleep('timeout error', skip_once=True) - except aiohttp.ClientHttpProcessingError: - sleep('HTTP error', skip_once=True) - except aiohttp.ServerDisconnectedError: - sleep('disconnected', skip_once=True) - except aiohttp.ClientConnectionError: - sleep('connection problem - is your daemon running?') - except DaemonWarmingUpError: - sleep('still starting up checking blocks...') - except Exception as e: - sleep('request gave unexpected error: {}'.format(e)) - - async def _send_single(self, method, params=None): - '''Send a single request to the daemon.''' payload = {'method': method} if params: payload['params'] = params - return await self._send(payload) + return await self._send(payload, processor) + + async def _send_vector(self, method, params_iterable, replace_errs=False): + '''Send several requests of the same method. + + The result will be an array of the same length as params_iterable. + If replace_errs is true, any item with an error is returned as None, + othewise an exception is raised.''' + def processor(result): + errs = [item['error'] for item in result if item['error']] + if not errs or replace_errs: + return [item['result'] for item in result] + if any(err.get('code') == self.WARMING_UP for err in errs): + raise DaemonWarmingUpError + raise DaemonError(errs) - async def _send_vector(self, method, params_iterable): - '''Send several requests of the same method.''' payload = [{'method': method, 'params': p} for p in params_iterable] if payload: - return await self._send(payload) + return await self._send(payload, processor) return [] async def block_hex_hashes(self, first, count): @@ -156,14 +157,15 @@ class Daemon(util.LoggedClass): '''Return the serialized raw transaction with the given hash.''' return await self._send_single('getrawtransaction', (hex_hash, 0)) - async def getrawtransactions(self, hex_hashes): + async def getrawtransactions(self, hex_hashes, replace_errs=True): '''Return the serialized raw transactions with the given hashes. - Breaks large requests up. Yields after each sub request.''' + Replaces errors with None by default.''' params_iterable = ((hex_hash, 0) for hex_hash in hex_hashes) - txs = await self._send_vector('getrawtransaction', params_iterable) + txs = await self._send_vector('getrawtransaction', params_iterable, + replace_errs=replace_errs) # Convert hex strings to bytes - return [bytes.fromhex(tx) for tx in txs] + return [bytes.fromhex(tx) if tx else None for tx in txs] async def sendrawtransaction(self, params): '''Broadcast a transaction to the network.'''