From b427f55cafa9694e39c302930fa9066cd86ca2e5 Mon Sep 17 00:00:00 2001 From: Janus Date: Mon, 12 Feb 2018 16:02:35 +0100 Subject: [PATCH] asyncio: use is_running of interface instead of global stop flag --- lib/network.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/lib/network.py b/lib/network.py index 954c4f7d..1e0f4b46 100644 --- a/lib/network.py +++ b/lib/network.py @@ -595,7 +595,7 @@ class Network(util.DaemonThread): return str(method) + (':' + str(params[0]) if params else '') async def process_responses(self, interface): - while not self.stopped: + while interface.is_running(): request, response = await interface.get_response() if request: method, params, message_id = request @@ -621,7 +621,7 @@ class Network(util.DaemonThread): self.subscribed_addresses.add(params[0]) else: if not response: # Closed remotely / misbehaving - if not self.stopped: await self.connection_down(interface.server, "no response in process responses") + if interface.is_running(): await self.connection_down(interface.server, "no response in process responses") return # Rewrite response shape to match subscription request response method = response.get('method') @@ -740,7 +740,7 @@ class Network(util.DaemonThread): async def new_interface(self, server): # todo: get tip first, then decide which checkpoint to use. self.add_recent_server(server) - interface = Interface(server, self.config.path, self.proxy, lambda: not self.stopped) + interface = Interface(server, self.config.path, self.proxy, lambda: not self.stopped and server not in self.interfaces) interface.future = asyncio.Future() interface.blockchain = None interface.tip_header = None @@ -924,17 +924,17 @@ class Network(util.DaemonThread): def make_send_requests_job(self, interface): async def job(): try: - while not self.stopped: + while interface.is_running(): try: result = await asyncio.wait_for(asyncio.shield(interface.send_request()), 1) except TimeoutError: continue - if not result and not self.stopped: + if not result and interface.is_running(): await self.connection_down(interface.server, "send_request returned false") except GeneratorExit: pass except: - if not self.stopped: + if interface.is_running(): traceback.print_exc() self.print_error("FATAL ERROR ^^^") return asyncio.ensure_future(job()) @@ -949,7 +949,7 @@ class Network(util.DaemonThread): await self.connection_down(interface.server, "OSError in process_responses") self.print_error("OS error, connection downed") except BaseException: - if not self.stopped: + if interface.is_running(): traceback.print_exc() self.print_error("FATAL ERROR in process_responses") return asyncio.ensure_future(job()) @@ -987,21 +987,21 @@ class Network(util.DaemonThread): try: await self.queue_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION], interface) if not await interface.send_request(): - if not self.stopped: + if interface.is_running(): asyncio.ensure_future(self.connection_down(interface.server, "send_request false in boot_interface")) interface.future.set_result("could not send request") return - if self.stopped: + if not interface.is_running(): interface.future.set_result("stopped after sending request") return try: await asyncio.wait_for(interface.get_response(), 1) except TimeoutError: - if not self.stopped: + if interface.is_running(): asyncio.ensure_future(self.connection_down(interface.server, "timeout in boot_interface while getting response")) interface.future.set_result("timeout while getting response") return - if self.stopped: + if not interface.is_running(): interface.future.set_result("stopped after getting response") return #self.interfaces[interface.server] = interface @@ -1010,7 +1010,7 @@ class Network(util.DaemonThread): await asyncio.wait_for(self.switch_to_interface(interface.server), 1) interface.jobs = [asyncio.ensure_future(x) for x in [self.make_ping_job(interface), self.make_send_requests_job(interface), self.make_process_responses_job(interface)]] gathered = asyncio.gather(*interface.jobs) - while not self.stopped: + while interface.is_running(): try: await asyncio.wait_for(asyncio.shield(gathered), 1) except TimeoutError: @@ -1022,7 +1022,7 @@ class Network(util.DaemonThread): self.print_error(interface.server, "GENERATOR EXIT") pass except BaseException as e: - if not self.stopped: + if interface.is_running(): traceback.print_exc() self.print_error("FATAL ERROR in boot_interface") raise e @@ -1039,7 +1039,7 @@ class Network(util.DaemonThread): def make_ping_job(self, interface): async def job(): try: - while not self.stopped: + while interface.is_running(): await asyncio.sleep(1) # Send pings and shut down stale interfaces # must use copy of values @@ -1053,7 +1053,7 @@ class Network(util.DaemonThread): except GeneratorExit: pass except: - if not self.stopped: + if interface.is_running(): traceback.print_exc() self.print_error("FATAL ERROR in ping_job") return asyncio.ensure_future(job())