From 683205a3fadaa3974b057bdf63830a87ad574abe Mon Sep 17 00:00:00 2001 From: Janus Date: Fri, 15 Dec 2017 16:59:24 +0100 Subject: [PATCH] asyncio: warn if sending takes too long, only output errors if not shutting down --- lib/interface.py | 41 ++++++++++++++++++++++++----------------- lib/network.py | 37 ++++++++++++++++++++++--------------- 2 files changed, 46 insertions(+), 32 deletions(-) diff --git a/lib/interface.py b/lib/interface.py index c1426d09..7cbcfa20 100644 --- a/lib/interface.py +++ b/lib/interface.py @@ -68,7 +68,8 @@ class Interface(util.PrintError): - Member variable server. """ - def __init__(self, server, config_path, proxy_config): + def __init__(self, server, config_path, proxy_config, is_running): + self.is_running = is_running self.addr = self.auth = None if proxy_config is not None: if proxy_config["mode"] == "socks5": @@ -119,7 +120,7 @@ class Interface(util.PrintError): fut.set_result(protocol._sslpipe.ssl_object.getpeercert(True)) except BaseException as e: fut.set_exception(e) - while True: + while self.is_running(): fut = asyncio.Future() asyncio.ensure_future(job(fut)) try: @@ -133,18 +134,19 @@ class Interface(util.PrintError): await asyncio.sleep(1) continue except: - traceback.print_exc() + if self.is_running(): traceback.print_exc() continue break - print("done sleeping") + if not self.is_running(): return transport.close() else: reader, writer = await asyncio.wait_for(self.conn_coro(context), 5) dercert = writer.get_extra_info('ssl_object').getpeercert(True) writer.close() except ConnectionError: - traceback.print_exc() - print("Previous exception from _save_certificate") + if self.is_running(): + traceback.print_exc() + print("Previous exception from _save_certificate") return except TimeoutError: return @@ -162,7 +164,7 @@ class Interface(util.PrintError): async def _get_read_write(self): async with self.lock: if self.reader is not None and self.writer is not None: - return self.reader, self.writer + return self.reader, self.writer, True if self.use_ssl: cert_path = os.path.join(self.config_path, 'certs', self.host) if not os.path.exists(cert_path): @@ -196,19 +198,24 @@ class Interface(util.PrintError): print("TimeoutError after getting certificate successfully...") raise except BaseException as e: - traceback.print_exc() - print("Previous exception will now be reraised") + if self.is_running(): + traceback.print_exc() + print("Previous exception will now be reraised") raise e if self.use_ssl and is_new: self.print_error("saving new certificate for", self.host) os.rename(temporary_path, cert_path) - return self.reader, self.writer + return self.reader, self.writer, False async def send_all(self, list_of_requests): - _, w = await self._get_read_write() + _, w, usedExisting = await self._get_read_write() + starttime = time.time() for i in list_of_requests: w.write(json.dumps(i).encode("ascii") + b"\n") await w.drain() + if time.time() - starttime > 2.5: + print("send_all: sending is taking too long. Used existing connection: ", usedExisting) + raise ConnectionError("sending is taking too long") def close(self): if self.writer: @@ -228,10 +235,10 @@ class Interface(util.PrintError): self.buf = self.buf[pos+1:] self.last_action = time.time() return obj - async def get(self, is_running): - reader, _ = await self._get_read_write() + async def get(self): + reader, _, _ = await self._get_read_write() - while is_running(): + while self.is_running(): tried = self._try_extract() if tried: return tried temp = io.BytesIO() @@ -299,7 +306,7 @@ class Interface(util.PrintError): return True return False - async def get_response(self, is_running): + async def get_response(self): '''Call if there is data available on the socket. Returns a list of (request, response) pairs. Notifications are singleton unsolicited responses presumably as a result of prior @@ -308,11 +315,11 @@ class Interface(util.PrintError): corresponding request. If the connection was closed remotely or the remote server is misbehaving, a (None, None) will appear. ''' - response = await self.get(is_running) + response = await self.get() if not type(response) is dict: if response is None: self.closed_remotely = True - if is_running(): + if self.is_running(): self.print_error("connection closed remotely") return None, None if self.debug: diff --git a/lib/network.py b/lib/network.py index 5551b1c8..eac5867f 100644 --- a/lib/network.py +++ b/lib/network.py @@ -565,7 +565,7 @@ class Network(util.DaemonThread): async def process_responses(self, interface): while self.is_running(): - request, response = await interface.get_response(lambda: self.is_running()) + request, response = await interface.get_response() if request: method, params, message_id = request k = self.get_index(method, params) @@ -648,7 +648,6 @@ class Network(util.DaemonThread): # Requests needs connectivity. If we don't have an interface, # we cannot process them. if not self.interface: - print("no interface, returning") await asyncio.sleep(1) return @@ -699,7 +698,7 @@ class Network(util.DaemonThread): async def new_interface(self, server): # todo: get tip first, then decide which checkpoint to use. self.add_recent_server(server) - interface = Interface(server, self.config.path, self.proxy) + interface = Interface(server, self.config.path, self.proxy, lambda: self.is_running()) interface.future = asyncio.Future() interface.blockchain = None interface.tip_header = None @@ -895,8 +894,9 @@ class Network(util.DaemonThread): except GeneratorExit: pass except: - traceback.print_exc() - print("FATAL ERROR ^^^") + if self.is_running(): + traceback.print_exc() + print("FATAL ERROR ^^^") return asyncio.ensure_future(job()) def make_process_responses_job(self, interface): @@ -909,8 +909,9 @@ class Network(util.DaemonThread): await self.connection_down(interface.server) print("OS error, connection downed") except BaseException: - traceback.print_exc() - print("FATAL ERROR in process_responses") + if self.is_running(): + traceback.print_exc() + print("FATAL ERROR in process_responses") return asyncio.ensure_future(job()) def make_process_pending_sends_job(self): @@ -924,8 +925,9 @@ class Network(util.DaemonThread): #except CancelledError: # pass except BaseException as e: - traceback.print_exc() - print("FATAL ERROR in process_pending_sends") + if self.is_running(): + traceback.print_exc() + print("FATAL ERROR in process_pending_sends") return asyncio.ensure_future(job()) def init_headers_file(self): @@ -970,9 +972,10 @@ class Network(util.DaemonThread): print(interface.server, "GENERATOR EXIT") pass except BaseException as e: - traceback.print_exc() - print("FATAL ERROR in boot_interface") - raise e + if self.is_running(): + traceback.print_exc() + print("FATAL ERROR in boot_interface") + raise e interface.boot_job = asyncio.ensure_future(job()) def make_ping_job(self, interface): @@ -991,8 +994,9 @@ class Network(util.DaemonThread): except GeneratorExit: pass except: - traceback.print_exc() - print("FATAL ERROR in ping_job") + if self.is_running(): + traceback.print_exc() + print("FATAL ERROR in ping_job") return asyncio.ensure_future(job()) async def maintain_interfaces(self): @@ -1059,7 +1063,10 @@ class Network(util.DaemonThread): for i in asyncio.Task.all_tasks(): if asyncio.Task.current_task() == i: continue try: - await i + await asyncio.wait_for(i, 5) + except TimeoutError: + print("TOO SLOW TO SHUT DOWN, CANCELLING", i) + i.cancel() except CancelledError: pass future.set_result("run_async done")