diff --git a/lib/interface.py b/lib/interface.py index 95c30f5c..11ab8196 100644 --- a/lib/interface.py +++ b/lib/interface.py @@ -145,10 +145,6 @@ class Interface(util.PrintError): reader, writer = await asyncio.wait_for(self.conn_coro(context), 3) dercert = writer.get_extra_info('ssl_object').getpeercert(True) writer.close() - except OSError as e: # not ConnectionError because we need socket.gaierror too - if self.is_running(): - self.print_error(self.server, "Exception in _save_certificate", type(e)) - return except TimeoutError: return assert dercert @@ -263,6 +259,7 @@ class Interface(util.PrintError): ''' self.request_time = time.time() await self.unsent_requests.put((self.request_time, args)) + await self.unsent_requests.join() def num_requests(self): '''Keep unanswered requests below 100''' @@ -270,25 +267,22 @@ class Interface(util.PrintError): return min(n, self.unsent_requests.qsize()) async def send_request(self): - '''Sends queued requests. Returns False on failure.''' + '''Sends a queued request.''' make_dict = lambda m, p, i: {'method': m, 'params': p, 'id': i} n = self.num_requests() - try: - prio, request = await asyncio.wait_for(self.unsent_requests.get(), 1.5) - except TimeoutError: - return False + prio, request = await self.unsent_requests.get() try: await self.send_all([make_dict(*request)]) except (SocksError, OSError, TimeoutError) as e: if type(e) is SocksError: self.print_error(e) await self.unsent_requests.put((prio, request)) - return False + return + self.unsent_requests.task_done() if self.debug: self.print_error("-->", request) self.unanswered_requests[request[2]] = request self.last_action = time.time() - return True def ping_required(self): '''Maintains time since last ping. Returns True if a ping should diff --git a/lib/network.py b/lib/network.py index 4426c58e..186dce0e 100644 --- a/lib/network.py +++ b/lib/network.py @@ -167,6 +167,7 @@ class Network(util.DaemonThread): """ def __init__(self, config=None): + self.send_requests_jobs = {} self.disconnected_servers = {} self.connecting = set() self.stopped = True @@ -533,8 +534,7 @@ class Network(util.DaemonThread): if interface.server == self.default_server: self.interface = None if interface.jobs: - for i in interface.jobs: - asyncio.wait_for(i, 3) + await asyncio.gather(*interface.jobs) assert interface.boot_job try: await asyncio.wait_for(asyncio.shield(interface.boot_job), 6) # longer than any timeout while connecting @@ -741,6 +741,8 @@ class Network(util.DaemonThread): sys.exit(1) return self.print_error("connection down", server) + assert not self.send_requests_jobs[server].cancelled() + self.send_requests_jobs[server].cancel() self.disconnected_servers[server] = reason if server == self.default_server: self.set_status('disconnected') @@ -933,20 +935,18 @@ class Network(util.DaemonThread): def make_send_requests_job(self, interface): async def job(): try: - while interface.is_running(): - try: - result = await asyncio.wait_for(asyncio.shield(interface.send_request()), 1) - except TimeoutError: - continue - if not result and interface.is_running(): - await self.connection_down(interface.server, "send_request returned false") - except GeneratorExit: - pass + while True: + await interface.send_request() except: if interface.is_running(): traceback.print_exc() self.print_error("FATAL ERROR ^^^") - return asyncio.ensure_future(job()) + + job = asyncio.ensure_future(job()) + + if interface.server in self.send_requests_jobs: assert self.send_requests_jobs[interface.server].done() + self.send_requests_jobs[interface.server] = job + return job def make_process_responses_job(self, interface): async def job(): @@ -994,10 +994,12 @@ class Network(util.DaemonThread): def boot_interface(self, interface): async def job(): try: - await self.queue_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION], interface) - if not await interface.send_request(): + send_requests_job = self.make_send_requests_job(interface) + try: + await asyncio.wait_for(self.queue_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION], interface), 1) + except TimeoutError: if interface.is_running(): - asyncio.ensure_future(self.connection_down(interface.server, "send_request false in boot_interface")) + asyncio.ensure_future(self.connection_down(interface.server, "send_request timeout in boot_interface")) interface.future.set_result("could not send request") return if not interface.is_running(): @@ -1017,7 +1019,7 @@ class Network(util.DaemonThread): await self.queue_request('blockchain.headers.subscribe', [], interface) if interface.server == self.default_server: await asyncio.wait_for(self.switch_to_interface(interface.server), 1) - interface.jobs = [asyncio.ensure_future(x) for x in [self.make_send_requests_job(interface), self.make_process_responses_job(interface)]] + interface.jobs = [send_requests_job, self.make_process_responses_job(interface)] gathered = asyncio.gather(*interface.jobs) while interface.is_running(): try: diff --git a/lib/util.py b/lib/util.py index a2b4c885..67467d68 100644 --- a/lib/util.py +++ b/lib/util.py @@ -228,7 +228,13 @@ class DaemonThread(threading.Thread, PrintError): self.forever_coroutines_queue = asyncio.Queue() # making queue here because __init__ is called from non-network thread self.loop = asyncio.get_event_loop() async def getFromQueueAndStart(): - jobs = await self.forever_coroutines_queue.get() + while True: + try: + jobs = await asyncio.wait_for(self.forever_coroutines_queue.get(), 1) + break + except asyncio.TimeoutError: + if not self.is_running(): break + continue await asyncio.gather(*[i.run(self.is_running) for i in jobs]) self.print_error("FOREVER JOBS DONE") self.forever_coroutines_task = asyncio.ensure_future(getFromQueueAndStart())