From 18efa67f1d5e773d3e03a7658d5e51640b4be28a Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 6 Nov 2016 07:26:06 +0900 Subject: [PATCH] Various daemon improvements --- server/block_processor.py | 14 +++---- server/daemon.py | 78 ++++++++++++++++++++++----------------- 2 files changed, 51 insertions(+), 41 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 34391b3..4ea5d55 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -167,7 +167,7 @@ class MemPool(LoggedClass): self.txs = {} self.hash168s = defaultdict(set) # None can be a key self.bp = bp - self.initial = True + self.count = 0 async def update(self, hex_hashes): '''Update state given the current mempool to the passed set of hashes. @@ -178,8 +178,7 @@ class MemPool(LoggedClass): hex_hashes = set(hex_hashes) touched = set() - if self.initial: - self.initial = False + if self.count == 0: self.logger.info('initial fetch of {:,d} daemon mempool txs' .format(len(hex_hashes))) @@ -192,9 +191,6 @@ class MemPool(LoggedClass): for hash168 in hash168s: self.hash168s[hash168].remove(hex_hash) touched.update(hash168s) - if gone: - self.logger.info('{:,d} entries removed from mempool' - .format(len(gone))) # Get the raw transactions for the new hashes. Ignore the # ones the daemon no longer has (it will return None). Put @@ -253,8 +249,10 @@ class MemPool(LoggedClass): self.hash168s[hash168].add(hex_hash) touched.add(hash168) - self.logger.info('{:,d} entries in mempool for {:,d} addresses' - .format(len(self.txs), len(self.hash168s))) + if self.count % 20 == 0: + self.logger.info('{:,d} entries in mempool for {:,d} addresses' + .format(len(self.txs), len(self.hash168s))) + self.count += 1 # Might include a None return touched diff --git a/server/daemon.py b/server/daemon.py index a4682df..6f29142 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -17,8 +17,11 @@ import lib.util as util class DaemonError(Exception): - '''Raised when the daemon returns an error in its results that - cannot be remedied by retrying.''' + '''Raised when the daemon returns an error in its results.''' + + +class DaemonWarmingUpError(DaemonError): + '''Raised when the daemon returns an error in its results.''' class Daemon(util.LoggedClass): @@ -39,42 +42,59 @@ class Daemon(util.LoggedClass): .format(height)) self._height = height - @classmethod - def is_warming_up(cls, err): - if not isinstance(err, list): - err = [err] - return any(elt.get('code') == cls.WARMING_UP for elt in err) + async def post(self, data): + '''Send data to the daemon and handle the response.''' + async with aiohttp.post(self.url, data=data) as resp: + result = await resp.json() + + if isinstance(result, list): + errs = tuple(item['error'] for item in result) + if not any(errs): + return tuple(item['result'] for item in result) + if any(err.get('code') == self.WARMING_UP for err in errs if err): + raise DaemonWarmingUpError + raise DaemonError(errs) + else: + err = result['error'] + if not err: + return result['result'] + if err.get('code') == self.WARMING_UP: + raise DaemonWarmingUpError + raise DaemonError(err) async def send(self, payload): - '''Send a payload to be converted to JSON.''' + '''Send a payload to be converted to JSON. + + Handles temporary connection issues. Daemon reponse errors + are raise through DaemonError. + ''' data = json.dumps(payload) secs = 1 while True: try: - async with aiohttp.post(self.url, data=data) as resp: - result = await resp.json() - if not self.is_warming_up(result): - return result - msg = 'daemon is still warming up' + return await self.post(data) except asyncio.TimeoutError: msg = 'timeout error' - except aiohttp.DisconnectedError as e: - msg = '{}: {}'.format(e.__class__.__name__, e) + except aiohttp.ClientHttpProcessingError: + msg = 'HTTP error' + except aiohttp.ServerDisconnectedError: + msg = 'daemon disconnected' + except aiohttp.ClientConnectionError: + msg = 'connection problem - is your daemon running?' + except DaemonWarmingUpError: + msg = 'daemon is still warming up' - secs = min(180, secs * 2) self.logger.error('{}. Sleeping {:d}s and trying again...' .format(msg, secs)) await asyncio.sleep(secs) + secs = min(180, secs * 2) async def send_single(self, method, params=None): '''Send a single request to the daemon.''' payload = {'method': method} if params: payload['params'] = params - item = await self.send(payload) - if item['error']: - raise DaemonError(item['error']) - return item['result'] + return await self.send(payload) async def send_many(self, mp_iterable): '''Send several requests at once. @@ -82,11 +102,7 @@ class Daemon(util.LoggedClass): The results are returned as a tuple.''' payload = tuple({'method': m, 'params': p} for m, p in mp_iterable) if payload: - items = await self.send(payload) - errs = tuple(item['error'] for item in items) - if any(errs): - raise DaemonError(errs) - return tuple(item['result'] for item in items) + return await self.send(payload) return () async def send_vector(self, method, params_iterable): @@ -132,14 +148,10 @@ class Daemon(util.LoggedClass): '''Return the serialized raw transactions with the given hashes. Breaks large requests up. Yields after each sub request.''' - param_lists = tuple((hex_hash, 0) for hex_hash in hex_hashes) - raw_txs = [] - for chunk in util.chunks(param_lists, 10000): - txs = await self.send_vector('getrawtransaction', chunk) - # Convert hex strings to bytes - raw_txs.append(tuple(bytes.fromhex(tx) for tx in txs)) - await asyncio.sleep(0) - return sum(raw_txs, ()) + params_iterable = ((hex_hash, 0) for hex_hash in hex_hashes) + txs = await self.send_vector('getrawtransaction', params_iterable) + # Convert hex strings to bytes + return tuple(bytes.fromhex(tx) for tx in txs) async def sendrawtransaction(self, params): '''Broadcast a transaction to the network.'''