asyncio: warn if sending takes too long, only output errors if not shutting down

This commit is contained in:
Janus 2017-12-15 16:59:24 +01:00
parent dcb0a24e6f
commit 683205a3fa
2 changed files with 46 additions and 32 deletions

View File

@ -68,7 +68,8 @@ class Interface(util.PrintError):
- Member variable server.
"""
def __init__(self, server, config_path, proxy_config):
def __init__(self, server, config_path, proxy_config, is_running):
self.is_running = is_running
self.addr = self.auth = None
if proxy_config is not None:
if proxy_config["mode"] == "socks5":
@ -119,7 +120,7 @@ class Interface(util.PrintError):
fut.set_result(protocol._sslpipe.ssl_object.getpeercert(True))
except BaseException as e:
fut.set_exception(e)
while True:
while self.is_running():
fut = asyncio.Future()
asyncio.ensure_future(job(fut))
try:
@ -133,18 +134,19 @@ class Interface(util.PrintError):
await asyncio.sleep(1)
continue
except:
traceback.print_exc()
if self.is_running(): traceback.print_exc()
continue
break
print("done sleeping")
if not self.is_running(): return
transport.close()
else:
reader, writer = await asyncio.wait_for(self.conn_coro(context), 5)
dercert = writer.get_extra_info('ssl_object').getpeercert(True)
writer.close()
except ConnectionError:
traceback.print_exc()
print("Previous exception from _save_certificate")
if self.is_running():
traceback.print_exc()
print("Previous exception from _save_certificate")
return
except TimeoutError:
return
@ -162,7 +164,7 @@ class Interface(util.PrintError):
async def _get_read_write(self):
async with self.lock:
if self.reader is not None and self.writer is not None:
return self.reader, self.writer
return self.reader, self.writer, True
if self.use_ssl:
cert_path = os.path.join(self.config_path, 'certs', self.host)
if not os.path.exists(cert_path):
@ -196,19 +198,24 @@ class Interface(util.PrintError):
print("TimeoutError after getting certificate successfully...")
raise
except BaseException as e:
traceback.print_exc()
print("Previous exception will now be reraised")
if self.is_running():
traceback.print_exc()
print("Previous exception will now be reraised")
raise e
if self.use_ssl and is_new:
self.print_error("saving new certificate for", self.host)
os.rename(temporary_path, cert_path)
return self.reader, self.writer
return self.reader, self.writer, False
async def send_all(self, list_of_requests):
_, w = await self._get_read_write()
_, w, usedExisting = await self._get_read_write()
starttime = time.time()
for i in list_of_requests:
w.write(json.dumps(i).encode("ascii") + b"\n")
await w.drain()
if time.time() - starttime > 2.5:
print("send_all: sending is taking too long. Used existing connection: ", usedExisting)
raise ConnectionError("sending is taking too long")
def close(self):
if self.writer:
@ -228,10 +235,10 @@ class Interface(util.PrintError):
self.buf = self.buf[pos+1:]
self.last_action = time.time()
return obj
async def get(self, is_running):
reader, _ = await self._get_read_write()
async def get(self):
reader, _, _ = await self._get_read_write()
while is_running():
while self.is_running():
tried = self._try_extract()
if tried: return tried
temp = io.BytesIO()
@ -299,7 +306,7 @@ class Interface(util.PrintError):
return True
return False
async def get_response(self, is_running):
async def get_response(self):
'''Call if there is data available on the socket. Returns a list of
(request, response) pairs. Notifications are singleton
unsolicited responses presumably as a result of prior
@ -308,11 +315,11 @@ class Interface(util.PrintError):
corresponding request. If the connection was closed remotely
or the remote server is misbehaving, a (None, None) will appear.
'''
response = await self.get(is_running)
response = await self.get()
if not type(response) is dict:
if response is None:
self.closed_remotely = True
if is_running():
if self.is_running():
self.print_error("connection closed remotely")
return None, None
if self.debug:

View File

@ -565,7 +565,7 @@ class Network(util.DaemonThread):
async def process_responses(self, interface):
while self.is_running():
request, response = await interface.get_response(lambda: self.is_running())
request, response = await interface.get_response()
if request:
method, params, message_id = request
k = self.get_index(method, params)
@ -648,7 +648,6 @@ class Network(util.DaemonThread):
# Requests needs connectivity. If we don't have an interface,
# we cannot process them.
if not self.interface:
print("no interface, returning")
await asyncio.sleep(1)
return
@ -699,7 +698,7 @@ class Network(util.DaemonThread):
async def new_interface(self, server):
# todo: get tip first, then decide which checkpoint to use.
self.add_recent_server(server)
interface = Interface(server, self.config.path, self.proxy)
interface = Interface(server, self.config.path, self.proxy, lambda: self.is_running())
interface.future = asyncio.Future()
interface.blockchain = None
interface.tip_header = None
@ -895,8 +894,9 @@ class Network(util.DaemonThread):
except GeneratorExit:
pass
except:
traceback.print_exc()
print("FATAL ERROR ^^^")
if self.is_running():
traceback.print_exc()
print("FATAL ERROR ^^^")
return asyncio.ensure_future(job())
def make_process_responses_job(self, interface):
@ -909,8 +909,9 @@ class Network(util.DaemonThread):
await self.connection_down(interface.server)
print("OS error, connection downed")
except BaseException:
traceback.print_exc()
print("FATAL ERROR in process_responses")
if self.is_running():
traceback.print_exc()
print("FATAL ERROR in process_responses")
return asyncio.ensure_future(job())
def make_process_pending_sends_job(self):
@ -924,8 +925,9 @@ class Network(util.DaemonThread):
#except CancelledError:
# pass
except BaseException as e:
traceback.print_exc()
print("FATAL ERROR in process_pending_sends")
if self.is_running():
traceback.print_exc()
print("FATAL ERROR in process_pending_sends")
return asyncio.ensure_future(job())
def init_headers_file(self):
@ -970,9 +972,10 @@ class Network(util.DaemonThread):
print(interface.server, "GENERATOR EXIT")
pass
except BaseException as e:
traceback.print_exc()
print("FATAL ERROR in boot_interface")
raise e
if self.is_running():
traceback.print_exc()
print("FATAL ERROR in boot_interface")
raise e
interface.boot_job = asyncio.ensure_future(job())
def make_ping_job(self, interface):
@ -991,8 +994,9 @@ class Network(util.DaemonThread):
except GeneratorExit:
pass
except:
traceback.print_exc()
print("FATAL ERROR in ping_job")
if self.is_running():
traceback.print_exc()
print("FATAL ERROR in ping_job")
return asyncio.ensure_future(job())
async def maintain_interfaces(self):
@ -1059,7 +1063,10 @@ class Network(util.DaemonThread):
for i in asyncio.Task.all_tasks():
if asyncio.Task.current_task() == i: continue
try:
await i
await asyncio.wait_for(i, 5)
except TimeoutError:
print("TOO SLOW TO SHUT DOWN, CANCELLING", i)
i.cancel()
except CancelledError:
pass
future.set_result("run_async done")