diff --git a/lib/network.py b/lib/network.py index e60248bc..58295594 100644 --- a/lib/network.py +++ b/lib/network.py @@ -218,7 +218,6 @@ class Network(util.DaemonThread): self.interfaces = {} self.auto_connect = self.config.get('auto_connect', True) self.connecting = set() - self.network_job = None self.proxy = None def register_callback(self, callback, events): @@ -493,8 +492,10 @@ class Network(util.DaemonThread): self.interfaces.pop(interface.server) if interface.server == self.default_server: self.interface = None - if interface.jobs is not None: + if interface.jobs: interface.jobs.cancel() + if interface.boot_job is not None: + interface.boot_job.cancel() if self.process_pending_sends_job is not None: self.process_pending_sends_job.cancel() interface.close() @@ -697,14 +698,15 @@ class Network(util.DaemonThread): interface.tip = 0 interface.mode = 'default' interface.request = None - await self.queued_interfaces.put(interface) + interface.jobs = None + interface.boot_job = None + self.boot_interface(interface) #self.interfaces[server] = interface return interface async def request_chunk(self, interface, idx): interface.print_error("requesting chunk %d" % idx) await self.queue_request('blockchain.block.get_chunk', [idx], interface) - assert interface.jobs interface.request = idx interface.req_time = time.time() @@ -924,38 +926,34 @@ class Network(util.DaemonThread): with b.lock: b.update_size() - async def make_network_job(self, future): - try: - await self.start_network(deserialize_server(self.default_server)[2], - deserialize_proxy(self.config.get('proxy'))) - self.process_pending_sends_job = self.make_process_pending_sends_job() - while self.is_running(): - interface = await self.queued_interfaces.get() + def boot_interface(self, interface): + async def job(): + try: await self.queue_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION], interface) if not await interface.send_request(): - print("interface did not work") self.connection_down(interface.server) - continue - gathered = asyncio.gather(self.make_ping_job(interface), self.make_send_requests_job(interface), self.make_process_responses_job(interface)) - interface.jobs = asyncio.ensure_future(gathered) - def cb(fut): - fut.exception() - try: - for i in fut.result(): assert i is None - except CancelledError: - pass - if not future.done(): future.set_result("Network job done") - interface.jobs.add_done_callback(cb) + self.connecting.remove(interface.server) + return + self.connecting.remove(interface.server) self.interfaces[interface.server] = interface await self.queue_request('blockchain.headers.subscribe', [], interface) if interface.server == self.default_server: await self.switch_to_interface(interface.server) + interface.jobs = asyncio.ensure_future(asyncio.gather(self.make_ping_job(interface), self.make_send_requests_job(interface), self.make_process_responses_job(interface))) + def cb(fut): + try: + fut.exception() + except: + pass + interface.jobs.add_done_callback(cb) #self.notify('interfaces') - - except BaseException as e: - traceback.print_exc() - print("FATAL ERROR in network_job") - if not future.done(): future.set_exception(e) + except GeneratorExit: + pass + except BaseException as e: + traceback.print_exc() + print("FATAL ERROR in start_interface") + raise e + interface.boot_job = asyncio.ensure_future(job()) def make_ping_job(self, interface): async def job(): @@ -965,17 +963,16 @@ class Network(util.DaemonThread): # Send pings and shut down stale interfaces # must use copy of values if interface.has_timed_out(): - print("timed out") + print(interface.server, "timed out") self.connection_down(interface.server) elif interface.ping_required(): - print("ping required") params = [ELECTRUM_VERSION, PROTOCOL_VERSION] await self.queue_request('server.version', params, interface) except CancelledError: pass except: traceback.print_exc() - print("FATAL ERRROR in ping_job") + print("FATAL ERROR in ping_job") return asyncio.ensure_future(job()) async def maintain_interfaces(self): @@ -1011,18 +1008,23 @@ class Network(util.DaemonThread): self.loop = loop # so we store it in the instance too self.init_headers_file() self.pending_sends = asyncio.Queue() - self.queued_interfaces = asyncio.Queue() - if not self.network_job: - network_job_future = asyncio.Future() - self.network_job = asyncio.ensure_future(self.make_network_job(network_job_future)) + self.process_pending_sends_job = self.make_process_pending_sends_job() + async def job(): + try: + await self.start_network(deserialize_server(self.default_server)[2], + deserialize_proxy(self.config.get('proxy'))) + except: + traceback.print_exc() + print("Previous exception in start_network") + raise + asyncio.ensure_future(job()) run_future = asyncio.Future() asyncio.ensure_future(self.run_async(run_future)) - combined_task = asyncio.gather(network_job_future, run_future) - loop.run_until_complete(combined_task) - combined_task.exception() - self.print_error("combined task result", combined_task.result()) + loop.run_until_complete(run_future) + run_future.exception() + self.print_error("run future result", run_future.result()) loop.close() async def run_async(self, future): diff --git a/lib/ssl_in_socks.py b/lib/ssl_in_socks.py index 7b6bb44f..9727b03a 100644 --- a/lib/ssl_in_socks.py +++ b/lib/ssl_in_socks.py @@ -1,3 +1,4 @@ +import traceback import ssl from asyncio.sslproto import SSLProtocol import aiosocks @@ -22,15 +23,18 @@ class AppProto(asyncio.Protocol): def makeProtocolFactory(receivedQueue, connUpLock, ca_certs): class MySSLProtocol(SSLProtocol): def connection_lost(self, data): - print("conn lost") super().connection_lost(data) def _on_handshake_complete(self, handshake_exc): super()._on_handshake_complete(handshake_exc) if handshake_exc is not None: print("handshake complete", handshake_exc) - print("cert length", len(self._sslpipe.ssl_object.getpeercert(True))) + try: + print("cert length", len(self._sslpipe.ssl_object.getpeercert(True))) + except ValueError as e: + assert str(e) == "handshake not done yet", e + print("exception was from on_handshake_complete") # TODO how can this happen? Handshake should be done if callback is called def __init__(self): - context = interface.get_ssl_context(cert_reqs=ssl.CERT_REQUIRED if ca_certs is not None else ssl.CERT_NONE, ca_certs=ca_certs) + context = interface.get_ssl_context(cert_reqs=ssl.CERT_REQUIRED if ca_certs is None else ssl.CERT_NONE, ca_certs=ca_certs) proto = AppProto(receivedQueue, connUpLock) super().__init__(asyncio.get_event_loop(), proto, context, None) return MySSLProtocol