Clean up RPC handling
Remove class Handle exceptions properly by cancelling tasks Log what is happening Generalise send interface
This commit is contained in:
parent
4b99ae4e11
commit
370cceab83
155
server/server.py
155
server/server.py
@ -19,27 +19,37 @@ class Server(object):
|
|||||||
def __init__(self, env):
|
def __init__(self, env):
|
||||||
self.env = env
|
self.env = env
|
||||||
self.db = DB(env)
|
self.db = DB(env)
|
||||||
self.rpc = RPC(env)
|
self.block_cache = BlockCache(env, self.db)
|
||||||
self.block_cache = BlockCache(env, self.db, self.rpc)
|
self.tasks = [
|
||||||
|
|
||||||
def async_tasks(self):
|
|
||||||
return [
|
|
||||||
asyncio.ensure_future(self.block_cache.catch_up()),
|
asyncio.ensure_future(self.block_cache.catch_up()),
|
||||||
asyncio.ensure_future(self.block_cache.process_cache()),
|
asyncio.ensure_future(self.block_cache.process_cache()),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
for signame in ('SIGINT', 'SIGTERM'):
|
||||||
|
loop.add_signal_handler(getattr(signal, signame),
|
||||||
|
partial(self.on_signal, signame))
|
||||||
|
|
||||||
|
def on_signal(self, signame):
|
||||||
|
logging.warning('received {} signal, preparing to shut down'
|
||||||
|
.format(signame))
|
||||||
|
for task in self.tasks:
|
||||||
|
task.cancel()
|
||||||
|
|
||||||
|
def async_tasks(self):
|
||||||
|
return self.tasks
|
||||||
|
|
||||||
|
|
||||||
class BlockCache(object):
|
class BlockCache(object):
|
||||||
'''Requests blocks ahead of time from the daemon. Serves them
|
'''Requests blocks ahead of time from the daemon. Serves them
|
||||||
to the blockchain processor.'''
|
to the blockchain processor.'''
|
||||||
|
|
||||||
def __init__(self, env, db, rpc):
|
def __init__(self, env, db):
|
||||||
self.logger = logging.getLogger('BlockCache')
|
self.logger = logging.getLogger('BlockCache')
|
||||||
self.logger.setLevel(logging.INFO)
|
self.logger.setLevel(logging.INFO)
|
||||||
|
|
||||||
self.db = db
|
self.db = db
|
||||||
self.rpc = rpc
|
self.rpc_url = env.rpc_url
|
||||||
self.stop = False
|
|
||||||
# Cache target size is in MB. Has little effect on sync time.
|
# Cache target size is in MB. Has little effect on sync time.
|
||||||
self.cache_limit = 10
|
self.cache_limit = 10
|
||||||
self.daemon_height = 0
|
self.daemon_height = 0
|
||||||
@ -49,37 +59,26 @@ class BlockCache(object):
|
|||||||
self.recent_sizes = []
|
self.recent_sizes = []
|
||||||
self.ave_size = 0
|
self.ave_size = 0
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
self.logger.info('using RPC URL {}'.format(self.rpc_url))
|
||||||
for signame in ('SIGINT', 'SIGTERM'):
|
|
||||||
loop.add_signal_handler(getattr(signal, signame),
|
|
||||||
partial(self.on_signal, signame))
|
|
||||||
|
|
||||||
def on_signal(self, signame):
|
|
||||||
logging.warning('Received {} signal, preparing to shut down'
|
|
||||||
.format(signame))
|
|
||||||
self.blocks = []
|
|
||||||
self.stop = True
|
|
||||||
|
|
||||||
async def process_cache(self):
|
async def process_cache(self):
|
||||||
while not self.stop:
|
while True:
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
while self.blocks:
|
while self.blocks:
|
||||||
self.db.process_block(self.blocks.pop(), self.daemon_height)
|
self.db.process_block(self.blocks.pop(), self.daemon_height)
|
||||||
# Release asynchronous block fetching
|
# Release asynchronous block fetching
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
self.db.flush_all(self.daemon_height)
|
|
||||||
|
|
||||||
async def catch_up(self):
|
async def catch_up(self):
|
||||||
self.logger.info('catching up, block cache limit {:d}MB...'
|
self.logger.info('catching up, block cache limit {:d}MB...'
|
||||||
.format(self.cache_limit))
|
.format(self.cache_limit))
|
||||||
|
|
||||||
while await self.maybe_prefill():
|
try:
|
||||||
await asyncio.sleep(1)
|
while await self.maybe_prefill():
|
||||||
|
await asyncio.sleep(1)
|
||||||
if not self.stop:
|
|
||||||
self.logger.info('caught up to height {:d}'
|
self.logger.info('caught up to height {:d}'
|
||||||
.format(self.daemon_height))
|
.format(self.daemon_height))
|
||||||
|
finally:
|
||||||
self.db.flush_all(self.daemon_height)
|
self.db.flush_all(self.daemon_height)
|
||||||
|
|
||||||
def cache_used(self):
|
def cache_used(self):
|
||||||
@ -96,35 +95,26 @@ class BlockCache(object):
|
|||||||
processing.'''
|
processing.'''
|
||||||
cache_limit = self.cache_limit * 1024 * 1024
|
cache_limit = self.cache_limit * 1024 * 1024
|
||||||
while True:
|
while True:
|
||||||
if self.stop:
|
|
||||||
return False
|
|
||||||
|
|
||||||
cache_used = self.cache_used()
|
cache_used = self.cache_used()
|
||||||
if cache_used > cache_limit:
|
if cache_used > cache_limit:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Keep going by getting a whole new cache_limit of blocks
|
# Keep going by getting a whole new cache_limit of blocks
|
||||||
self.daemon_height = await self.rpc.rpc_single('getblockcount')
|
self.daemon_height = await self.send_single('getblockcount')
|
||||||
max_count = min(self.daemon_height - self.fetched_height, 4000)
|
max_count = min(self.daemon_height - self.fetched_height, 4000)
|
||||||
count = min(max_count, self.prefill_count(cache_limit))
|
count = min(max_count, self.prefill_count(cache_limit))
|
||||||
if not count or self.stop:
|
if not count:
|
||||||
return False # Done catching up
|
return False # Done catching up
|
||||||
|
|
||||||
first = self.fetched_height + 1
|
first = self.fetched_height + 1
|
||||||
param_lists = [[height] for height in range(first, first + count)]
|
param_lists = [[height] for height in range(first, first + count)]
|
||||||
hashes = await self.rpc.rpc_multi('getblockhash', param_lists)
|
hashes = await self.send_vector('getblockhash', param_lists)
|
||||||
|
|
||||||
if self.stop:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Hashes is an array of hex strings
|
# Hashes is an array of hex strings
|
||||||
param_lists = [(h, False) for h in hashes]
|
param_lists = [(h, False) for h in hashes]
|
||||||
blocks = await self.rpc.rpc_multi('getblock', param_lists)
|
blocks = await self.send_vector('getblock', param_lists)
|
||||||
self.fetched_height += count
|
self.fetched_height += count
|
||||||
|
|
||||||
if self.stop:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Convert hex string to bytes and put in memoryview
|
# Convert hex string to bytes and put in memoryview
|
||||||
blocks = [bytes.fromhex(block) for block in blocks]
|
blocks = [bytes.fromhex(block) for block in blocks]
|
||||||
# Reverse order and place at front of list
|
# Reverse order and place at front of list
|
||||||
@ -138,64 +128,47 @@ class BlockCache(object):
|
|||||||
self.recent_sizes = self.recent_sizes[excess:]
|
self.recent_sizes = self.recent_sizes[excess:]
|
||||||
self.ave_size = sum(self.recent_sizes) // len(self.recent_sizes)
|
self.ave_size = sum(self.recent_sizes) // len(self.recent_sizes)
|
||||||
|
|
||||||
|
async def send_single(self, method, params=None):
|
||||||
class RPC(object):
|
|
||||||
|
|
||||||
def __init__(self, env):
|
|
||||||
self.logger = logging.getLogger('RPC')
|
|
||||||
self.logger.setLevel(logging.INFO)
|
|
||||||
self.rpc_url = env.rpc_url
|
|
||||||
self.logger.info('using RPC URL {}'.format(self.rpc_url))
|
|
||||||
|
|
||||||
async def rpc_multi(self, method, param_lists):
|
|
||||||
payload = [{'method': method, 'params': param_list}
|
|
||||||
for param_list in param_lists]
|
|
||||||
while True:
|
|
||||||
dresults = await self.daemon(payload)
|
|
||||||
errs = [dresult['error'] for dresult in dresults]
|
|
||||||
if not any(errs):
|
|
||||||
return [dresult['result'] for dresult in dresults]
|
|
||||||
for err in errs:
|
|
||||||
if err.get('code') == -28:
|
|
||||||
self.logger.warning('daemon still warming up...')
|
|
||||||
secs = 10
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
self.logger.error('daemon returned errors: {}'.format(errs))
|
|
||||||
secs = 0
|
|
||||||
self.logger.info('sleeping {:d} seconds and trying again...'
|
|
||||||
.format(secs))
|
|
||||||
await asyncio.sleep(secs)
|
|
||||||
|
|
||||||
|
|
||||||
async def rpc_single(self, method, params=None):
|
|
||||||
payload = {'method': method}
|
payload = {'method': method}
|
||||||
if params:
|
if params:
|
||||||
payload['params'] = params
|
payload['params'] = params
|
||||||
while True:
|
result, = await self.send((payload, ))
|
||||||
dresult = await self.daemon(payload)
|
return result
|
||||||
err = dresult['error']
|
|
||||||
if not err:
|
|
||||||
return dresult['result']
|
|
||||||
if err.get('code') == -28:
|
|
||||||
self.logger.warning('daemon still warming up...')
|
|
||||||
secs = 10
|
|
||||||
else:
|
|
||||||
self.logger.error('daemon returned error: {}'.format(err))
|
|
||||||
secs = 0
|
|
||||||
self.logger.info('sleeping {:d} seconds and trying again...'
|
|
||||||
.format(secs))
|
|
||||||
await asyncio.sleep(secs)
|
|
||||||
|
|
||||||
async def daemon(self, payload):
|
async def send_many(self, mp_pairs):
|
||||||
|
payload = [{'method': method, 'params': params}
|
||||||
|
for method, params in mp_pairs]
|
||||||
|
return await self.send(payload)
|
||||||
|
|
||||||
|
async def send_vector(self, method, params_list):
|
||||||
|
payload = [{'method': method, 'params': params}
|
||||||
|
for params in params_list]
|
||||||
|
return await self.send(payload)
|
||||||
|
|
||||||
|
async def send(self, payload):
|
||||||
|
assert isinstance(payload, (tuple, list))
|
||||||
|
data = json.dumps(payload)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.request('POST', self.rpc_url,
|
||||||
async with session.post(self.rpc_url,
|
data = data) as resp:
|
||||||
data=json.dumps(payload)) as resp:
|
result = await resp.json()
|
||||||
return await resp.json()
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error('aiohttp error: {}'.format(e))
|
msg = 'aiohttp error: {}'.format(e)
|
||||||
|
secs = 3
|
||||||
|
else:
|
||||||
|
errs = tuple(item['error'] for item in result)
|
||||||
|
if not any(errs):
|
||||||
|
return tuple(item['result'] for item in result)
|
||||||
|
if any(err.get('code') == -28 for err in errs):
|
||||||
|
msg = 'daemon still warming up...'
|
||||||
|
secs = 10
|
||||||
|
else:
|
||||||
|
msg = 'daemon errors: {}'.format(errs)
|
||||||
|
secs = 1
|
||||||
|
|
||||||
self.logger.info('sleeping 1 second and trying again...')
|
self.logger.error('{}. Sleeping {:d}s and trying again...'
|
||||||
await asyncio.sleep(1)
|
.format(msg, secs))
|
||||||
|
await asyncio.sleep(secs)
|
||||||
|
|||||||
@ -28,6 +28,8 @@ def main_loop():
|
|||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
try:
|
try:
|
||||||
loop.run_until_complete(asyncio.gather(*tasks))
|
loop.run_until_complete(asyncio.gather(*tasks))
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logging.warning('task cancelled; asyncio event loop closing')
|
||||||
finally:
|
finally:
|
||||||
loop.close()
|
loop.close()
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user