Various daemon improvements

This commit is contained in:
Neil Booth 2016-11-06 07:26:06 +09:00
parent 1b589d3d1f
commit 18efa67f1d
2 changed files with 51 additions and 41 deletions

View File

@ -167,7 +167,7 @@ class MemPool(LoggedClass):
self.txs = {} self.txs = {}
self.hash168s = defaultdict(set) # None can be a key self.hash168s = defaultdict(set) # None can be a key
self.bp = bp self.bp = bp
self.initial = True self.count = 0
async def update(self, hex_hashes): async def update(self, hex_hashes):
'''Update state given the current mempool to the passed set of hashes. '''Update state given the current mempool to the passed set of hashes.
@ -178,8 +178,7 @@ class MemPool(LoggedClass):
hex_hashes = set(hex_hashes) hex_hashes = set(hex_hashes)
touched = set() touched = set()
if self.initial: if self.count == 0:
self.initial = False
self.logger.info('initial fetch of {:,d} daemon mempool txs' self.logger.info('initial fetch of {:,d} daemon mempool txs'
.format(len(hex_hashes))) .format(len(hex_hashes)))
@ -192,9 +191,6 @@ class MemPool(LoggedClass):
for hash168 in hash168s: for hash168 in hash168s:
self.hash168s[hash168].remove(hex_hash) self.hash168s[hash168].remove(hex_hash)
touched.update(hash168s) touched.update(hash168s)
if gone:
self.logger.info('{:,d} entries removed from mempool'
.format(len(gone)))
# Get the raw transactions for the new hashes. Ignore the # Get the raw transactions for the new hashes. Ignore the
# ones the daemon no longer has (it will return None). Put # ones the daemon no longer has (it will return None). Put
@ -253,8 +249,10 @@ class MemPool(LoggedClass):
self.hash168s[hash168].add(hex_hash) self.hash168s[hash168].add(hex_hash)
touched.add(hash168) touched.add(hash168)
self.logger.info('{:,d} entries in mempool for {:,d} addresses' if self.count % 20 == 0:
.format(len(self.txs), len(self.hash168s))) self.logger.info('{:,d} entries in mempool for {:,d} addresses'
.format(len(self.txs), len(self.hash168s)))
self.count += 1
# Might include a None # Might include a None
return touched return touched

View File

@ -17,8 +17,11 @@ import lib.util as util
class DaemonError(Exception): class DaemonError(Exception):
'''Raised when the daemon returns an error in its results that '''Raised when the daemon returns an error in its results.'''
cannot be remedied by retrying.'''
class DaemonWarmingUpError(DaemonError):
'''Raised when the daemon returns an error in its results.'''
class Daemon(util.LoggedClass): class Daemon(util.LoggedClass):
@ -39,42 +42,59 @@ class Daemon(util.LoggedClass):
.format(height)) .format(height))
self._height = height self._height = height
@classmethod async def post(self, data):
def is_warming_up(cls, err): '''Send data to the daemon and handle the response.'''
if not isinstance(err, list): async with aiohttp.post(self.url, data=data) as resp:
err = [err] result = await resp.json()
return any(elt.get('code') == cls.WARMING_UP for elt in err)
if isinstance(result, list):
errs = tuple(item['error'] for item in result)
if not any(errs):
return tuple(item['result'] for item in result)
if any(err.get('code') == self.WARMING_UP for err in errs if err):
raise DaemonWarmingUpError
raise DaemonError(errs)
else:
err = result['error']
if not err:
return result['result']
if err.get('code') == self.WARMING_UP:
raise DaemonWarmingUpError
raise DaemonError(err)
async def send(self, payload): async def send(self, payload):
'''Send a payload to be converted to JSON.''' '''Send a payload to be converted to JSON.
Handles temporary connection issues. Daemon reponse errors
are raise through DaemonError.
'''
data = json.dumps(payload) data = json.dumps(payload)
secs = 1 secs = 1
while True: while True:
try: try:
async with aiohttp.post(self.url, data=data) as resp: return await self.post(data)
result = await resp.json()
if not self.is_warming_up(result):
return result
msg = 'daemon is still warming up'
except asyncio.TimeoutError: except asyncio.TimeoutError:
msg = 'timeout error' msg = 'timeout error'
except aiohttp.DisconnectedError as e: except aiohttp.ClientHttpProcessingError:
msg = '{}: {}'.format(e.__class__.__name__, e) msg = 'HTTP error'
except aiohttp.ServerDisconnectedError:
msg = 'daemon disconnected'
except aiohttp.ClientConnectionError:
msg = 'connection problem - is your daemon running?'
except DaemonWarmingUpError:
msg = 'daemon is still warming up'
secs = min(180, secs * 2)
self.logger.error('{}. Sleeping {:d}s and trying again...' self.logger.error('{}. Sleeping {:d}s and trying again...'
.format(msg, secs)) .format(msg, secs))
await asyncio.sleep(secs) await asyncio.sleep(secs)
secs = min(180, secs * 2)
async def send_single(self, method, params=None): async def send_single(self, method, params=None):
'''Send a single request to the daemon.''' '''Send a single request to the daemon.'''
payload = {'method': method} payload = {'method': method}
if params: if params:
payload['params'] = params payload['params'] = params
item = await self.send(payload) return await self.send(payload)
if item['error']:
raise DaemonError(item['error'])
return item['result']
async def send_many(self, mp_iterable): async def send_many(self, mp_iterable):
'''Send several requests at once. '''Send several requests at once.
@ -82,11 +102,7 @@ class Daemon(util.LoggedClass):
The results are returned as a tuple.''' The results are returned as a tuple.'''
payload = tuple({'method': m, 'params': p} for m, p in mp_iterable) payload = tuple({'method': m, 'params': p} for m, p in mp_iterable)
if payload: if payload:
items = await self.send(payload) return await self.send(payload)
errs = tuple(item['error'] for item in items)
if any(errs):
raise DaemonError(errs)
return tuple(item['result'] for item in items)
return () return ()
async def send_vector(self, method, params_iterable): async def send_vector(self, method, params_iterable):
@ -132,14 +148,10 @@ class Daemon(util.LoggedClass):
'''Return the serialized raw transactions with the given hashes. '''Return the serialized raw transactions with the given hashes.
Breaks large requests up. Yields after each sub request.''' Breaks large requests up. Yields after each sub request.'''
param_lists = tuple((hex_hash, 0) for hex_hash in hex_hashes) params_iterable = ((hex_hash, 0) for hex_hash in hex_hashes)
raw_txs = [] txs = await self.send_vector('getrawtransaction', params_iterable)
for chunk in util.chunks(param_lists, 10000): # Convert hex strings to bytes
txs = await self.send_vector('getrawtransaction', chunk) return tuple(bytes.fromhex(tx) for tx in txs)
# Convert hex strings to bytes
raw_txs.append(tuple(bytes.fromhex(tx) for tx in txs))
await asyncio.sleep(0)
return sum(raw_txs, ())
async def sendrawtransaction(self, params): async def sendrawtransaction(self, params):
'''Broadcast a transaction to the network.''' '''Broadcast a transaction to the network.'''