Separate async item processor per session

Improve daemon wait logic

Fixes #100
This commit is contained in:
Neil Booth 2017-02-19 14:19:26 +09:00
parent 23b7ec3fb5
commit 86f6a148b9
6 changed files with 106 additions and 136 deletions

View File

@ -26,13 +26,9 @@ class RPCClient(JSONSession):
super().__init__(version=JSONRPCv2) super().__init__(version=JSONRPCv2)
self.max_send = 0 self.max_send = 0
self.max_buffer_size = 5*10**6 self.max_buffer_size = 5*10**6
self.event = asyncio.Event()
def have_pending_items(self):
self.event.set()
async def wait_for_response(self): async def wait_for_response(self):
await self.event.wait() await self.items_event.wait()
await self.process_pending_items() await self.process_pending_items()
def send_rpc_request(self, method, params): def send_rpc_request(self, method, params):

View File

@ -258,7 +258,7 @@ class JSONSessionBase(util.LoggedClass):
from empty from empty
''' '''
_next_session_id = 0 _next_session_id = 0
_pending_reqs = {} _pending_reqs = {} # Outgoing requests waiting for a response
@classmethod @classmethod
def next_session_id(cls): def next_session_id(cls):
@ -320,6 +320,7 @@ class JSONSessionBase(util.LoggedClass):
self.pause = False self.pause = False
# Handling of incoming items # Handling of incoming items
self.items = collections.deque() self.items = collections.deque()
self.items_event = asyncio.Event()
self.batch_results = [] self.batch_results = []
# Handling of outgoing requests # Handling of outgoing requests
self.next_request_id = 0 self.next_request_id = 0
@ -461,10 +462,8 @@ class JSONSessionBase(util.LoggedClass):
self.send_error('empty batch', JSONRPC.INVALID_REQUEST) self.send_error('empty batch', JSONRPC.INVALID_REQUEST)
return return
# Incoming items get queued for later asynchronous processing.
if not self.items:
self.have_pending_items()
self.items.append(payload) self.items.append(payload)
self.items_event.set()
async def process_batch(self, batch, count): async def process_batch(self, batch, count):
'''Processes count items from the batch according to the JSON 2.0 '''Processes count items from the batch according to the JSON 2.0
@ -626,6 +625,9 @@ class JSONSessionBase(util.LoggedClass):
if binary: if binary:
self.send_binary(binary) self.send_binary(binary)
if not self.items:
self.items_event.clear()
def count_pending_items(self): def count_pending_items(self):
'''Counts the number of pending items.''' '''Counts the number of pending items.'''
return sum(len(item) if isinstance(item, list) else 1 return sum(len(item) if isinstance(item, list) else 1
@ -716,15 +718,6 @@ class JSONSessionBase(util.LoggedClass):
# App layer # App layer
def have_pending_items(self):
'''Called to indicate there are items pending to be processed
asynchronously by calling process_pending_items.
This is *not* called every time an item is added, just when
there were previously none and now there is at least one.
'''
raise NotImplementedError
def using_bandwidth(self, amount): def using_bandwidth(self, amount):
'''Called as bandwidth is consumed. '''Called as bandwidth is consumed.
@ -749,8 +742,12 @@ class JSONSession(JSONSessionBase, asyncio.Protocol):
'''A JSONSessionBase instance specialized for use with '''A JSONSessionBase instance specialized for use with
asyncio.protocol to implement the transport layer. asyncio.protocol to implement the transport layer.
Derived classes must provide have_pending_items() and may want to The app should await on items_event, which is set when unprocessed
override the request and notification handlers. incoming items remain and cleared when the queue is empty, and
then arrange to call process_pending_items asynchronously.
Derived classes may want to override the request and notification
handlers.
''' '''
def __init__(self, version=JSONRPCCompat): def __init__(self, version=JSONRPCCompat):

View File

@ -69,9 +69,6 @@ class Controller(util.LoggedClass):
self.next_stale_check = 0 self.next_stale_check = 0
self.history_cache = pylru.lrucache(256) self.history_cache = pylru.lrucache(256)
self.header_cache = pylru.lrucache(8) self.header_cache = pylru.lrucache(8)
self.queue = asyncio.PriorityQueue()
self.delayed_sessions = []
self.next_queue_id = 0
self.cache_height = 0 self.cache_height = 0
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.setup_bands() self.setup_bands()
@ -136,67 +133,6 @@ class Controller(util.LoggedClass):
def is_deprioritized(self, session): def is_deprioritized(self, session):
return self.session_priority(session) > self.BANDS return self.session_priority(session) > self.BANDS
async def enqueue_delayed_sessions(self):
while True:
now = time.time()
keep = []
for pair in self.delayed_sessions:
timeout, item = pair
priority, queue_id, session = item
if not session.pause and timeout <= now:
self.queue.put_nowait(item)
else:
keep.append(pair)
self.delayed_sessions = keep
# If paused and session count has fallen, start listening again
if (len(self.sessions) <= self.low_watermark
and self.state == self.PAUSED):
await self.start_external_servers()
# Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions:
if self.next_log_sessions:
data = self.session_data(for_log=True)
for line in Controller.sessions_text_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self.getinfo()))
self.next_log_sessions = time.time() + self.env.log_sessions
await asyncio.sleep(1)
def enqueue_session(self, session):
# Might have disconnected whilst waiting
if session not in self.sessions:
return
priority = self.session_priority(session)
item = (priority, self.next_queue_id, session)
self.next_queue_id += 1
excess = max(0, priority - self.BANDS)
if excess != session.last_delay:
session.last_delay = excess
if excess:
session.log_info('high bandwidth use, deprioritizing by '
'delaying responses {:d}s'.format(excess))
else:
session.log_info('stopped delaying responses')
delay = max(int(session.pause), excess)
if delay:
self.delayed_sessions.append((time.time() + delay, item))
else:
self.queue.put_nowait(item)
async def serve_requests(self):
'''Asynchronously run through the task queue.'''
while True:
priority_, id_, session = await self.queue.get()
if session in self.sessions:
await session.process_pending_items()
# Re-enqueue the session if stuff is left
if session.items:
self.enqueue_session(session)
async def run_in_executor(self, func, *args): async def run_in_executor(self, func, *args):
'''Wait whilst running func in the executor.''' '''Wait whilst running func in the executor.'''
return await self.loop.run_in_executor(None, func, *args) return await self.loop.run_in_executor(None, func, *args)
@ -225,11 +161,30 @@ class Controller(util.LoggedClass):
except Exception: except Exception:
self.log_error(traceback.format_exc()) self.log_error(traceback.format_exc())
async def check_request_timeouts(self): async def housekeeping(self):
'''Regularly check pending JSON requests for timeouts.''' '''Regular housekeeping checks.'''
n = 0
while True: while True:
await asyncio.sleep(30) n += 1
await asyncio.sleep(15)
JSONSessionBase.timeout_check() JSONSessionBase.timeout_check()
if n % 10 == 0:
self.clear_stale_sessions()
# Start listening for incoming connections if paused and
# session count has fallen
if (self.state == self.PAUSED and
len(self.sessions) <= self.low_watermark):
await self.start_external_servers()
# Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions:
if self.next_log_sessions:
data = self.session_data(for_log=True)
for line in Controller.sessions_text_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self.getinfo()))
self.next_log_sessions = time.time() + self.env.log_sessions
async def wait_for_bp_catchup(self): async def wait_for_bp_catchup(self):
'''Called when the block processor catches up.''' '''Called when the block processor catches up.'''
@ -237,12 +192,9 @@ class Controller(util.LoggedClass):
self.logger.info('block processor has caught up') self.logger.info('block processor has caught up')
self.ensure_future(self.peer_mgr.main_loop()) self.ensure_future(self.peer_mgr.main_loop())
self.ensure_future(self.start_servers()) self.ensure_future(self.start_servers())
self.ensure_future(self.check_request_timeouts()) self.ensure_future(self.housekeeping())
self.ensure_future(self.mempool.main_loop()) self.ensure_future(self.mempool.main_loop())
self.ensure_future(self.enqueue_delayed_sessions())
self.ensure_future(self.notify()) self.ensure_future(self.notify())
for n in range(4):
self.ensure_future(self.serve_requests())
async def main_loop(self): async def main_loop(self):
'''Controller main loop.''' '''Controller main loop.'''
@ -379,11 +331,28 @@ class Controller(util.LoggedClass):
self.header_cache[height] = header self.header_cache[height] = header
return header return header
def session_delay(self, session):
priority = self.session_priority(session)
excess = max(0, priority - self.BANDS)
if excess != session.last_delay:
session.last_delay = excess
if excess:
session.log_info('high bandwidth use, deprioritizing by '
'delaying responses {:d}s'.format(excess))
else:
session.log_info('stopped delaying responses')
return max(int(session.pause), excess)
async def process_items(self, session):
'''Waits for incoming session items and processes them.'''
while True:
await session.items_event.wait()
await asyncio.sleep(self.session_delay(session))
if not session.pause:
await session.process_pending_items()
def add_session(self, session): def add_session(self, session):
now = time.time() session.items_future = self.ensure_future(self.process_items(session))
if now > self.next_stale_check:
self.next_stale_check = now + 300
self.clear_stale_sessions()
gid = int(session.start_time - self.start_time) // 900 gid = int(session.start_time - self.start_time) // 900
self.groups[gid].append(session) self.groups[gid].append(session)
self.sessions[session] = gid self.sessions[session] = gid
@ -400,6 +369,7 @@ class Controller(util.LoggedClass):
def remove_session(self, session): def remove_session(self, session):
'''Remove a session from our sessions list if there.''' '''Remove a session from our sessions list if there.'''
session.items_future.cancel()
if session in self.sessions: if session in self.sessions:
gid = self.sessions.pop(session) gid = self.sessions.pop(session)
assert gid in self.groups assert gid in self.groups

View File

@ -10,6 +10,7 @@ daemon.'''
import asyncio import asyncio
import json import json
import time
import traceback import traceback
import aiohttp import aiohttp
@ -38,6 +39,8 @@ class Daemon(util.LoggedClass):
# Limit concurrent RPC calls to this number. # Limit concurrent RPC calls to this number.
# See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16
self.workqueue_semaphore = asyncio.Semaphore(value=10) self.workqueue_semaphore = asyncio.Semaphore(value=10)
self.down = False
self.last_error_time = 0
def set_urls(self, urls): def set_urls(self, urls):
'''Set the URLS to the given list, and switch to the first one.''' '''Set the URLS to the given list, and switch to the first one.'''
@ -65,48 +68,56 @@ class Daemon(util.LoggedClass):
return True return True
return False return False
async def _send_data(self, data):
async with self.workqueue_semaphore:
async with aiohttp.ClientSession() as session:
async with session.post(self.url(), data=data) as resp:
# If bitcoind can't find a tx, for some reason
# it returns 500 but fills out the JSON.
# Should still return 200 IMO.
if resp.status in (200, 500):
return await resp.json()
return (resp.status, resp.reason)
async def _send(self, payload, processor): async def _send(self, payload, processor):
'''Send a payload to be converted to JSON. '''Send a payload to be converted to JSON.
Handles temporary connection issues. Daemon reponse errors Handles temporary connection issues. Daemon reponse errors
are raise through DaemonError. are raise through DaemonError.
''' '''
self.prior_msg = None def log_error(error):
self.skip_count = None self.down = True
now = time.time()
def log_error(msg, skip_once=False): prior_time = self.last_error_time
if skip_once and self.skip_count is None: if now - prior_time > 60:
self.skip_count = 1 self.last_error_time = now
if msg != self.prior_msg or self.skip_count == 0: if prior_time and self.failover():
self.skip_count = 10 secs = 0
self.prior_msg = msg else:
self.logger.error('{} Retrying between sleeps...' self.logger.error('{} Retrying occasionally...'
.format(msg)) .format(error))
self.skip_count -= 1
data = json.dumps(payload) data = json.dumps(payload)
secs = 1 secs = 1
max_secs = 16 max_secs = 4
while True: while True:
try: try:
async with self.workqueue_semaphore: result = await self._send_data(data)
async with aiohttp.post(self.url(), data=data) as resp: if not isinstance(result, tuple):
# If bitcoind can't find a tx, for some reason result = processor(result)
# it returns 500 but fills out the JSON. if self.down:
# Should still return 200 IMO. self.down = False
if resp.status in (200, 500): self.last_error_time = 0
if self.prior_msg: self.logger.info('connection restored')
self.logger.info('connection restored') return result
result = processor(await resp.json())
return result
log_error('HTTP error code {:d}: {}' log_error('HTTP error code {:d}: {}'
.format(resp.status, resp.reason)) .format(result[0], result[1]))
except asyncio.TimeoutError: except asyncio.TimeoutError:
log_error('timeout error.', skip_once=True) log_error('timeout error.')
except aiohttp.ClientHttpProcessingError: except aiohttp.ClientHttpProcessingError:
log_error('HTTP error.', skip_once=True) log_error('HTTP error.')
except aiohttp.ServerDisconnectedError: except aiohttp.ServerDisconnectedError:
log_error('disconnected.', skip_once=True) log_error('disconnected.')
except aiohttp.ClientConnectionError: except aiohttp.ClientConnectionError:
log_error('connection problem - is your daemon running?') log_error('connection problem - is your daemon running?')
except self.DaemonWarmingUpError: except self.DaemonWarmingUpError:
@ -116,11 +127,8 @@ class Daemon(util.LoggedClass):
except Exception: except Exception:
self.log_error(traceback.format_exc()) self.log_error(traceback.format_exc())
if secs >= max_secs and self.failover(): await asyncio.sleep(secs)
secs = 1 secs = min(max_secs, secs * 2, 1)
else:
await asyncio.sleep(secs)
secs = min(max_secs, secs * 2)
def logged_url(self, url=None): def logged_url(self, url=None):
'''The host and port part, for logging.''' '''The host and port part, for logging.'''

View File

@ -59,13 +59,16 @@ class PeerSession(JSONSession):
self.failed = False self.failed = False
self.log_prefix = '[{}] '.format(self.peer) self.log_prefix = '[{}] '.format(self.peer)
def have_pending_items(self): async def wait_on_items(self):
self.peer_mgr.ensure_future(self.process_pending_items()) while True:
await self.items_event.wait()
await self.process_pending_items()
def connection_made(self, transport): def connection_made(self, transport):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
super().connection_made(transport) super().connection_made(transport)
self.log_prefix = '[{}] '.format(str(self.peer)[:25]) self.log_prefix = '[{}] '.format(str(self.peer)[:25])
self.future = self.peer_mgr.ensure_future(self.wait_on_items())
# Update IP address # Update IP address
if not self.peer.is_tor: if not self.peer.is_tor:
@ -82,6 +85,7 @@ class PeerSession(JSONSession):
def connection_lost(self, exc): def connection_lost(self, exc):
'''Handle disconnection.''' '''Handle disconnection.'''
super().connection_lost(exc) super().connection_lost(exc)
self.future.cancel()
self.peer_mgr.connection_lost(self) self.peer_mgr.connection_lost(self)
def on_peers_subscribe(self, result, error): def on_peers_subscribe(self, result, error):
@ -306,8 +310,8 @@ class PeerManager(util.LoggedClass):
'''Returns the server peers as a list of (ip, host, details) tuples. '''Returns the server peers as a list of (ip, host, details) tuples.
We return all peers we've connected to in the last day. We return all peers we've connected to in the last day.
Additionally, if we don't have onion routing, we return up to Additionally, if we don't have onion routing, we return a few
three randomly selected onion servers. hard-coded onion servers.
''' '''
cutoff = time.time() - STALE_SECS cutoff = time.time() - STALE_SECS
recent = [peer for peer in self.peers recent = [peer for peer in self.peers

View File

@ -47,11 +47,6 @@ class SessionBase(JSONSession):
self.bw_used = 0 self.bw_used = 0
self.peer_added = False self.peer_added = False
def have_pending_items(self):
'''Called each time the pending item queue goes from empty to having
one item.'''
self.controller.enqueue_session(self)
def close_connection(self): def close_connection(self):
'''Call this to close the connection.''' '''Call this to close the connection.'''
self.close_time = time.time() self.close_time = time.time()