From 37f1e3bd95eebbe44eb3463088f4e2747833e88f Mon Sep 17 00:00:00 2001 From: Janus Date: Tue, 19 Dec 2017 15:07:34 +0100 Subject: [PATCH] asyncio: more robost network connection handling, shorter timeouts --- lib/interface.py | 4 +- lib/network.py | 126 +++++++++++++++++++++++++++-------------------- 2 files changed, 74 insertions(+), 56 deletions(-) diff --git a/lib/interface.py b/lib/interface.py index 64629054..a6299994 100644 --- a/lib/interface.py +++ b/lib/interface.py @@ -140,12 +140,12 @@ class Interface(util.PrintError): if not self.is_running(): return transport.close() else: - reader, writer = await asyncio.wait_for(self.conn_coro(context), 5) + reader, writer = await asyncio.wait_for(self.conn_coro(context), 3) dercert = writer.get_extra_info('ssl_object').getpeercert(True) writer.close() except OSError as e: # not ConnectionError because we need socket.gaierror too if self.is_running(): - print("Exception in _save_certificate", type(e)) + print(self.server, "Exception in _save_certificate", type(e)) return except TimeoutError: return diff --git a/lib/network.py b/lib/network.py index c27fa322..34a98e66 100644 --- a/lib/network.py +++ b/lib/network.py @@ -20,6 +20,7 @@ # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import collections from functools import partial import time import queue @@ -220,8 +221,6 @@ class Network(util.DaemonThread): self.interface = None self.interfaces = {} self.auto_connect = self.config.get('auto_connect', True) - self.connecting = set() - self.proxy = None def register_callback(self, callback, events): with self.lock: @@ -374,15 +373,16 @@ class Network(util.DaemonThread): return out async def start_interface(self, server): - if (not server in self.interfaces and not server in self.connecting): - if server == self.default_server: - self.print_error("connecting to %s as new interface" % server) - self.set_status('connecting') - self.connecting.add(server) - return await self.new_interface(server) + assert not self.connecting[server].locked() + async with self.connecting[server]: + if (not server in self.interfaces): + if server == self.default_server: + self.print_error("connecting to %s as new interface" % server) + self.set_status('connecting') + return await self.new_interface(server) async def start_random_interface(self): - exclude_set = self.disconnected_servers.union(set(self.interfaces)) + exclude_set = self.disconnected_servers.union(set(self.interfaces.keys())) server = pick_random_server(self.get_servers(), self.protocol, exclude_set) if server: return await self.start_interface(server) @@ -396,7 +396,7 @@ class Network(util.DaemonThread): async def start_network(self, protocol, proxy): self.stopped = False assert not self.interface and not self.interfaces - assert not self.connecting + assert all(not i.locked() for i in self.connecting.values()) self.print_error('starting network') self.disconnected_servers = set([]) self.protocol = protocol @@ -406,13 +406,21 @@ class Network(util.DaemonThread): async def stop_network(self): self.stopped = True self.print_error("stopping network") - list_with_main = ([self.interface] if self.interface else []) - for num, interface in enumerate(list(self.interfaces.values()) + list_with_main): + async def stop(interface): + while True: + try: + await asyncio.wait_for(asyncio.shield(self.connection_down(interface.server)), 1.5) + break + except TimeoutError: + print("close_interface too slow...", interface.server) try: - await asyncio.wait_for(asyncio.shield(self.close_interface(interface)), 5) - await asyncio.wait_for(asyncio.shield(interface.future), 5) + print(interface.server, await asyncio.wait_for(asyncio.shield(interface.future), 3)) except TimeoutError: - print("close_interface too slow...") + print("interface future too slow", interface.server) + if self.interface: + await stop(self.interface) + while self.interfaces: + await stop(next(iter(self.interfaces.values()))) await asyncio.wait_for(asyncio.shield(self.process_pending_sends_job), 5) assert self.interface is None for i in range(100): @@ -422,7 +430,8 @@ class Network(util.DaemonThread): await asyncio.sleep(0.1) if self.interfaces: assert False, "interfaces not empty after waiting: " + repr(self.interfaces) - self.connecting = set() + for i in self.connecting.values(): + assert not i.locked() # called from the Qt thread def set_parameters(self, host, port, protocol, proxy, auto_connect): @@ -510,13 +519,11 @@ class Network(util.DaemonThread): self.interfaces.pop(interface.server) if interface.server == self.default_server: self.interface = None - while True: + if interface.jobs: for i in interface.jobs: - if not i.done(): - await asyncio.sleep(0.1) - continue - break - assert interface.boot_job.done() + asyncio.wait_for(i, 3) + assert interface.boot_job + await asyncio.wait_for(asyncio.shield(interface.boot_job), 3) interface.close() def add_recent_server(self, server): @@ -699,16 +706,20 @@ class Network(util.DaemonThread): async def connection_down(self, server): '''A connection to server either went down, or was never made. We distinguish by whether it is in self.interfaces.''' - self.print_error("connection down", server) - self.disconnected_servers.add(server) - if server == self.default_server: - self.set_status('disconnected') - if server in self.interfaces: - await self.close_interface(self.interfaces[server]) - self.notify('interfaces') - for b in self.blockchains.values(): - if b.catch_up == server: - b.catch_up = None + async with self.connecting[server]: + if server in self.disconnected_servers: + self.print_error("already disconnected", server) + return + self.print_error("connection down", server) + self.disconnected_servers.add(server) + if server == self.default_server: + self.set_status('disconnected') + if server in self.interfaces: + await self.close_interface(self.interfaces[server]) + self.notify('interfaces') + for b in self.blockchains.values(): + if b.catch_up == server: + b.catch_up = None async def new_interface(self, server): # todo: get tip first, then decide which checkpoint to use. @@ -723,7 +734,8 @@ class Network(util.DaemonThread): interface.jobs = None interface.boot_job = None self.boot_interface(interface) - #self.interfaces[server] = interface + assert server not in self.interfaces + self.interfaces[server] = interface return interface async def request_chunk(self, interface, idx): @@ -961,26 +973,28 @@ class Network(util.DaemonThread): async def job(): try: await self.queue_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION], interface) - def rem(): - if not self.stopped: - self.connecting.remove(interface.server) if not await interface.send_request(): - rem() - await self.connection_down(interface.server) + asyncio.ensure_future(self.connection_down(interface.server)) + interface.future.set_result("could not send request") + return + if self.stopped: + asyncio.ensure_future(self.connection_down(interface.server)) + interface.future.set_result("stopped after sending request") return - if self.stopped: return try: await asyncio.wait_for(interface.get_response(), 1) except TimeoutError: - rem() - await self.connection_down(interface.server) + asyncio.ensure_future(self.connection_down(interface.server)) + interface.future.set_result("timeout while getting response") return - if self.stopped: return - rem() - self.interfaces[interface.server] = interface + if self.stopped: + asyncio.ensure_future(self.connection_down(interface.server)) + interface.future.set_result("stopped after getting response") + return + #self.interfaces[interface.server] = interface await self.queue_request('blockchain.headers.subscribe', [], interface) if interface.server == self.default_server: - await self.switch_to_interface(interface.server) + await asyncio.wait_for(self.switch_to_interface(interface.server), 1) interface.jobs = [asyncio.ensure_future(x) for x in [self.make_ping_job(interface), self.make_send_requests_job(interface), self.make_process_responses_job(interface)]] def cb(num, fut): try: @@ -991,6 +1005,8 @@ class Network(util.DaemonThread): if not interface.future.done(): interface.future.set_result(str(num) + " done") for num, i in enumerate(interface.jobs): i.add_done_callback(partial(cb, num)) + interface.future.set_result("finished") + return #self.notify('interfaces') except GeneratorExit: print(interface.server, "GENERATOR EXIT") @@ -1037,7 +1053,7 @@ class Network(util.DaemonThread): now = time.time() # nodes - if len(self.interfaces) + len(self.connecting) < self.num_server: + if len(self.interfaces) + sum((1 if x.locked() else 0) for x in self.connecting.values()) < self.num_server: await self.start_random_interface() if now - self.nodes_retry_time > NODES_RETRY_INTERVAL: self.print_error('network: retrying connections') @@ -1050,12 +1066,13 @@ class Network(util.DaemonThread): if not self.is_connecting(): await self.switch_to_random_interface() else: - if self.default_server in self.disconnected_servers: - if now - self.server_retry_time > SERVER_RETRY_INTERVAL: - self.disconnected_servers.remove(self.default_server) - self.server_retry_time = now - else: - await self.switch_to_interface(self.default_server) + async with self.connecting[self.default_server]: + if self.default_server in self.disconnected_servers: + if now - self.server_retry_time > SERVER_RETRY_INTERVAL: + self.disconnected_servers.remove(self.default_server) + self.server_retry_time = now + else: + await self.switch_to_interface(self.default_server) else: if self.config.is_fee_estimates_update_required(): await self.request_fee_estimates() @@ -1067,6 +1084,7 @@ class Network(util.DaemonThread): self.loop = loop # so we store it in the instance too self.init_headers_file() self.pending_sends = asyncio.Queue() + self.connecting = collections.defaultdict(asyncio.Lock) async def job(): try: @@ -1099,7 +1117,7 @@ class Network(util.DaemonThread): for i in asyncio.Task.all_tasks(): if asyncio.Task.current_task() == i: continue try: - await asyncio.wait_for(asyncio.shield(i), 5) + await asyncio.wait_for(asyncio.shield(i), 2) except TimeoutError: print("TOO SLOW TO SHUT DOWN, CANCELLING", i) i.cancel() @@ -1114,7 +1132,7 @@ class Network(util.DaemonThread): if not height: return if height < self.max_checkpoint(): - await self.connection_down(interface) + await self.connection_down(interface.server) return interface.tip_header = header interface.tip = height