asyncio: improve shutdown latency, avoid 1.5 second timeouts

This commit is contained in:
Janus 2018-03-09 15:12:37 +01:00
parent fc03ed1e2b
commit 072058e93c
3 changed files with 30 additions and 28 deletions

View File

@ -145,10 +145,6 @@ class Interface(util.PrintError):
reader, writer = await asyncio.wait_for(self.conn_coro(context), 3) reader, writer = await asyncio.wait_for(self.conn_coro(context), 3)
dercert = writer.get_extra_info('ssl_object').getpeercert(True) dercert = writer.get_extra_info('ssl_object').getpeercert(True)
writer.close() writer.close()
except OSError as e: # not ConnectionError because we need socket.gaierror too
if self.is_running():
self.print_error(self.server, "Exception in _save_certificate", type(e))
return
except TimeoutError: except TimeoutError:
return return
assert dercert assert dercert
@ -263,6 +259,7 @@ class Interface(util.PrintError):
''' '''
self.request_time = time.time() self.request_time = time.time()
await self.unsent_requests.put((self.request_time, args)) await self.unsent_requests.put((self.request_time, args))
await self.unsent_requests.join()
def num_requests(self): def num_requests(self):
'''Keep unanswered requests below 100''' '''Keep unanswered requests below 100'''
@ -270,25 +267,22 @@ class Interface(util.PrintError):
return min(n, self.unsent_requests.qsize()) return min(n, self.unsent_requests.qsize())
async def send_request(self): async def send_request(self):
'''Sends queued requests. Returns False on failure.''' '''Sends a queued request.'''
make_dict = lambda m, p, i: {'method': m, 'params': p, 'id': i} make_dict = lambda m, p, i: {'method': m, 'params': p, 'id': i}
n = self.num_requests() n = self.num_requests()
try: prio, request = await self.unsent_requests.get()
prio, request = await asyncio.wait_for(self.unsent_requests.get(), 1.5)
except TimeoutError:
return False
try: try:
await self.send_all([make_dict(*request)]) await self.send_all([make_dict(*request)])
except (SocksError, OSError, TimeoutError) as e: except (SocksError, OSError, TimeoutError) as e:
if type(e) is SocksError: if type(e) is SocksError:
self.print_error(e) self.print_error(e)
await self.unsent_requests.put((prio, request)) await self.unsent_requests.put((prio, request))
return False return
self.unsent_requests.task_done()
if self.debug: if self.debug:
self.print_error("-->", request) self.print_error("-->", request)
self.unanswered_requests[request[2]] = request self.unanswered_requests[request[2]] = request
self.last_action = time.time() self.last_action = time.time()
return True
def ping_required(self): def ping_required(self):
'''Maintains time since last ping. Returns True if a ping should '''Maintains time since last ping. Returns True if a ping should

View File

@ -167,6 +167,7 @@ class Network(util.DaemonThread):
""" """
def __init__(self, config=None): def __init__(self, config=None):
self.send_requests_jobs = {}
self.disconnected_servers = {} self.disconnected_servers = {}
self.connecting = set() self.connecting = set()
self.stopped = True self.stopped = True
@ -533,8 +534,7 @@ class Network(util.DaemonThread):
if interface.server == self.default_server: if interface.server == self.default_server:
self.interface = None self.interface = None
if interface.jobs: if interface.jobs:
for i in interface.jobs: await asyncio.gather(*interface.jobs)
asyncio.wait_for(i, 3)
assert interface.boot_job assert interface.boot_job
try: try:
await asyncio.wait_for(asyncio.shield(interface.boot_job), 6) # longer than any timeout while connecting await asyncio.wait_for(asyncio.shield(interface.boot_job), 6) # longer than any timeout while connecting
@ -741,6 +741,8 @@ class Network(util.DaemonThread):
sys.exit(1) sys.exit(1)
return return
self.print_error("connection down", server) self.print_error("connection down", server)
assert not self.send_requests_jobs[server].cancelled()
self.send_requests_jobs[server].cancel()
self.disconnected_servers[server] = reason self.disconnected_servers[server] = reason
if server == self.default_server: if server == self.default_server:
self.set_status('disconnected') self.set_status('disconnected')
@ -933,20 +935,18 @@ class Network(util.DaemonThread):
def make_send_requests_job(self, interface): def make_send_requests_job(self, interface):
async def job(): async def job():
try: try:
while interface.is_running(): while True:
try: await interface.send_request()
result = await asyncio.wait_for(asyncio.shield(interface.send_request()), 1)
except TimeoutError:
continue
if not result and interface.is_running():
await self.connection_down(interface.server, "send_request returned false")
except GeneratorExit:
pass
except: except:
if interface.is_running(): if interface.is_running():
traceback.print_exc() traceback.print_exc()
self.print_error("FATAL ERROR ^^^") self.print_error("FATAL ERROR ^^^")
return asyncio.ensure_future(job())
job = asyncio.ensure_future(job())
if interface.server in self.send_requests_jobs: assert self.send_requests_jobs[interface.server].done()
self.send_requests_jobs[interface.server] = job
return job
def make_process_responses_job(self, interface): def make_process_responses_job(self, interface):
async def job(): async def job():
@ -994,10 +994,12 @@ class Network(util.DaemonThread):
def boot_interface(self, interface): def boot_interface(self, interface):
async def job(): async def job():
try: try:
await self.queue_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION], interface) send_requests_job = self.make_send_requests_job(interface)
if not await interface.send_request(): try:
await asyncio.wait_for(self.queue_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION], interface), 1)
except TimeoutError:
if interface.is_running(): if interface.is_running():
asyncio.ensure_future(self.connection_down(interface.server, "send_request false in boot_interface")) asyncio.ensure_future(self.connection_down(interface.server, "send_request timeout in boot_interface"))
interface.future.set_result("could not send request") interface.future.set_result("could not send request")
return return
if not interface.is_running(): if not interface.is_running():
@ -1017,7 +1019,7 @@ class Network(util.DaemonThread):
await self.queue_request('blockchain.headers.subscribe', [], interface) await self.queue_request('blockchain.headers.subscribe', [], interface)
if interface.server == self.default_server: if interface.server == self.default_server:
await asyncio.wait_for(self.switch_to_interface(interface.server), 1) await asyncio.wait_for(self.switch_to_interface(interface.server), 1)
interface.jobs = [asyncio.ensure_future(x) for x in [self.make_send_requests_job(interface), self.make_process_responses_job(interface)]] interface.jobs = [send_requests_job, self.make_process_responses_job(interface)]
gathered = asyncio.gather(*interface.jobs) gathered = asyncio.gather(*interface.jobs)
while interface.is_running(): while interface.is_running():
try: try:

View File

@ -228,7 +228,13 @@ class DaemonThread(threading.Thread, PrintError):
self.forever_coroutines_queue = asyncio.Queue() # making queue here because __init__ is called from non-network thread self.forever_coroutines_queue = asyncio.Queue() # making queue here because __init__ is called from non-network thread
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
async def getFromQueueAndStart(): async def getFromQueueAndStart():
jobs = await self.forever_coroutines_queue.get() while True:
try:
jobs = await asyncio.wait_for(self.forever_coroutines_queue.get(), 1)
break
except asyncio.TimeoutError:
if not self.is_running(): break
continue
await asyncio.gather(*[i.run(self.is_running) for i in jobs]) await asyncio.gather(*[i.run(self.is_running) for i in jobs])
self.print_error("FOREVER JOBS DONE") self.print_error("FOREVER JOBS DONE")
self.forever_coroutines_task = asyncio.ensure_future(getFromQueueAndStart()) self.forever_coroutines_task = asyncio.ensure_future(getFromQueueAndStart())