diff --git a/lib/interface.py b/lib/interface.py index 11ab8196..a9766288 100644 --- a/lib/interface.py +++ b/lib/interface.py @@ -68,8 +68,10 @@ class Interface(util.PrintError): - Member variable server. """ - def __init__(self, server, config_path, proxy_config, is_running): - self.is_running = is_running + def __init__(self, server, config_path, proxy_config, is_running, exception_handler): + self.error_future = asyncio.Future() + self.error_future.add_done_callback(exception_handler) + self.is_running = lambda: is_running() and not self.error_future.done() self.addr = self.auth = None if proxy_config is not None: if proxy_config["mode"] == "socks5": @@ -145,6 +147,11 @@ class Interface(util.PrintError): reader, writer = await asyncio.wait_for(self.conn_coro(context), 3) dercert = writer.get_extra_info('ssl_object').getpeercert(True) writer.close() + except OSError as e: # not ConnectionError because we need socket.gaierror too + if self.is_running(): + self.print_error(self.server, "Exception in _save_certificate", type(e)) + if not self.error_future.done(): self.error_future.set_result(e) + return except TimeoutError: return assert dercert @@ -296,6 +303,7 @@ class Interface(util.PrintError): def has_timed_out(self): '''Returns True if the interface has timed out.''' + if self.error_future.done(): return True if (self.unanswered_requests and time.time() - self.request_time > 10 and self.idle_time() > 10): self.print_error("timeout", len(self.unanswered_requests)) diff --git a/lib/network.py b/lib/network.py index 186dce0e..e8adbaee 100644 --- a/lib/network.py +++ b/lib/network.py @@ -167,7 +167,6 @@ class Network(util.DaemonThread): """ def __init__(self, config=None): - self.send_requests_jobs = {} self.disconnected_servers = {} self.connecting = set() self.stopped = True @@ -419,27 +418,14 @@ class Network(util.DaemonThread): self.print_error("stopping network") async def stop(interface): await self.connection_down(interface.server, "stopping network") - await asyncio.wait_for(asyncio.shield(interface.future), 3) + await interface.future stopped_this_time = set() while self.interfaces: do_next = next(iter(self.interfaces.values())) - assert do_next not in stopped_this_time - for i in self.disconnected_servers: - assert i not in self.interfaces.keys() - assert i != do_next.server stopped_this_time.add(do_next) await stop(do_next) - if self.interface: - assert self.interface.server in stopped_this_time, self.interface.server - await asyncio.wait_for(asyncio.shield(self.process_pending_sends_job), 5) - assert self.interface is None - for i in range(100): - if not self.interfaces: - break - else: - await asyncio.sleep(0.1) - if self.interfaces: - assert False, "interfaces not empty after waiting: " + repr(self.interfaces) + self.process_pending_sends_job.cancel() + await self.process_pending_sends_job # called from the Qt thread def set_parameters(self, host, port, protocol, proxy, auto_connect): @@ -516,8 +502,8 @@ class Network(util.DaemonThread): await self.start_interface(server) return i = self.interfaces[server] - if self.interface != i: - self.print_error("switching to", server) + if self.interface != i and self.num_server != 0: + self.print_error("switching to", server, "from", self.interface.server if self.interface else "None") # stop any current interface in order to terminate subscriptions # fixme: we don't want to close headers sub #self.close_interface(self.interface) @@ -535,12 +521,7 @@ class Network(util.DaemonThread): self.interface = None if interface.jobs: await asyncio.gather(*interface.jobs) - assert interface.boot_job - try: - await asyncio.wait_for(asyncio.shield(interface.boot_job), 6) # longer than any timeout while connecting - except TimeoutError: - self.print_error("taking too long", interface.server) - raise + await interface.boot_job interface.close() def add_recent_server(self, server): @@ -738,11 +719,11 @@ class Network(util.DaemonThread): raise Exception("already disconnected " + server + " because " + repr(self.disconnected_servers[server]) + ". new reason: " + repr(reason)) except: traceback.print_exc() - sys.exit(1) return - self.print_error("connection down", server) - assert not self.send_requests_jobs[server].cancelled() - self.send_requests_jobs[server].cancel() + self.print_error("connection down", server, reason) + for i in self.interfaces[server].jobs: + assert not i.cancelled() + i.cancel() self.disconnected_servers[server] = reason if server == self.default_server: self.set_status('disconnected') @@ -756,7 +737,13 @@ class Network(util.DaemonThread): def new_interface(self, server): # todo: get tip first, then decide which checkpoint to use. self.add_recent_server(server) - interface = Interface(server, self.config.path, self.proxy, lambda: not self.stopped and server in self.interfaces) + def interface_exception_handler(exception_future): + try: + raise exception_future.result() + except Exception as e: + print(type(e), " handled in new_interface") + asyncio.ensure_future(self.connection_down(server, "error in interface")) + interface = Interface(server, self.config.path, self.proxy, lambda: not self.stopped and server in self.interfaces, interface_exception_handler) interface.future = asyncio.Future() interface.blockchain = None interface.tip_header = None @@ -937,28 +924,28 @@ class Network(util.DaemonThread): try: while True: await interface.send_request() - except: - if interface.is_running(): + except CancelledError: + pass + except Exception as e: + print("send requests exp", str(e)) + if interface.is_running() or self.num_server == 0: traceback.print_exc() self.print_error("FATAL ERROR ^^^") + await self.connection_down(interface.server, "exp while send_request") - job = asyncio.ensure_future(job()) - - if interface.server in self.send_requests_jobs: assert self.send_requests_jobs[interface.server].done() - self.send_requests_jobs[interface.server] = job - return job + return asyncio.ensure_future(job()) def make_process_responses_job(self, interface): async def job(): try: await self.process_responses(interface) - except GeneratorExit: - pass except OSError: await self.connection_down(interface.server, "OSError in process_responses") self.print_error("OS error, connection downed") - except BaseException: - if interface.is_running(): + except CancelledError: + pass + except: + if interface.is_running() or self.num_server == 0: traceback.print_exc() self.print_error("FATAL ERROR in process_responses") return asyncio.ensure_future(job()) @@ -966,15 +953,12 @@ class Network(util.DaemonThread): def make_process_pending_sends_job(self): async def job(): try: - while not self.stopped: - try: - await asyncio.wait_for(asyncio.shield(self.process_pending_sends()), 1) - except TimeoutError: - continue - #except CancelledError: - # pass - except BaseException as e: - if not self.stopped: + while True: + await self.process_pending_sends() + except CancelledError: + pass + except: + if not self.stopped or self.num_server == 0: traceback.print_exc() self.print_error("FATAL ERROR in process_pending_sends") return asyncio.ensure_future(job()) @@ -994,13 +978,13 @@ class Network(util.DaemonThread): def boot_interface(self, interface): async def job(): try: - send_requests_job = self.make_send_requests_job(interface) + interface.jobs = [self.make_send_requests_job(interface)] # we need this job to process the request queued below try: await asyncio.wait_for(self.queue_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION], interface), 1) except TimeoutError: - if interface.is_running(): - asyncio.ensure_future(self.connection_down(interface.server, "send_request timeout in boot_interface")) - interface.future.set_result("could not send request") + asyncio.ensure_future(self.connection_down(interface.server, "couldn't send initial version")) + interface.future.set_result("couldn't send initial version") + return if not interface.is_running(): interface.future.set_result("stopped after sending request") @@ -1008,8 +992,7 @@ class Network(util.DaemonThread): try: await asyncio.wait_for(interface.get_response(), 1) except TimeoutError: - if interface.is_running(): - asyncio.ensure_future(self.connection_down(interface.server, "timeout in boot_interface while getting response")) + asyncio.ensure_future(self.connection_down(interface.server, "timeout in boot_interface while getting response")) interface.future.set_result("timeout while getting response") return if not interface.is_running(): @@ -1019,7 +1002,7 @@ class Network(util.DaemonThread): await self.queue_request('blockchain.headers.subscribe', [], interface) if interface.server == self.default_server: await asyncio.wait_for(self.switch_to_interface(interface.server), 1) - interface.jobs = [send_requests_job, self.make_process_responses_job(interface)] + interface.jobs += [self.make_process_responses_job(interface)] gathered = asyncio.gather(*interface.jobs) while interface.is_running(): try: @@ -1029,17 +1012,15 @@ class Network(util.DaemonThread): if interface.has_timed_out(): self.print_error(interface.server, "timed out") await self.connection_down(interface.server, "time out in ping_job") + break elif interface.ping_required(): params = [ELECTRUM_VERSION, PROTOCOL_VERSION] await self.queue_request('server.version', params, interface) interface.future.set_result("finished") return #self.notify('interfaces') - except GeneratorExit: - self.print_error(interface.server, "GENERATOR EXIT") - pass except BaseException as e: - if interface.is_running(): + if interface.is_running() or self.num_server == 0: traceback.print_exc() self.print_error("FATAL ERROR in boot_interface") raise e @@ -1122,15 +1103,6 @@ class Network(util.DaemonThread): self.run_jobs() await self.stop_network() self.on_stop() - for i in asyncio.Task.all_tasks(): - if asyncio.Task.current_task() == i: continue - try: - await asyncio.wait_for(asyncio.shield(i), 2) - except TimeoutError: - self.print_error("TOO SLOW TO SHUT DOWN, CANCELLING", i) - i.cancel() - except CancelledError: - pass future.set_result("run_async done") except BaseException as e: future.set_exception(e)