asyncio: try interfaces in parallel

This commit is contained in:
Janus 2017-12-13 20:13:48 +01:00
parent 88f906bc2a
commit 3ffedf83fc
2 changed files with 48 additions and 42 deletions

View File

@ -218,7 +218,6 @@ class Network(util.DaemonThread):
self.interfaces = {}
self.auto_connect = self.config.get('auto_connect', True)
self.connecting = set()
self.network_job = None
self.proxy = None
def register_callback(self, callback, events):
@ -493,8 +492,10 @@ class Network(util.DaemonThread):
self.interfaces.pop(interface.server)
if interface.server == self.default_server:
self.interface = None
if interface.jobs is not None:
if interface.jobs:
interface.jobs.cancel()
if interface.boot_job is not None:
interface.boot_job.cancel()
if self.process_pending_sends_job is not None:
self.process_pending_sends_job.cancel()
interface.close()
@ -697,14 +698,15 @@ class Network(util.DaemonThread):
interface.tip = 0
interface.mode = 'default'
interface.request = None
await self.queued_interfaces.put(interface)
interface.jobs = None
interface.boot_job = None
self.boot_interface(interface)
#self.interfaces[server] = interface
return interface
async def request_chunk(self, interface, idx):
interface.print_error("requesting chunk %d" % idx)
await self.queue_request('blockchain.block.get_chunk', [idx], interface)
assert interface.jobs
interface.request = idx
interface.req_time = time.time()
@ -924,38 +926,34 @@ class Network(util.DaemonThread):
with b.lock:
b.update_size()
async def make_network_job(self, future):
try:
await self.start_network(deserialize_server(self.default_server)[2],
deserialize_proxy(self.config.get('proxy')))
self.process_pending_sends_job = self.make_process_pending_sends_job()
while self.is_running():
interface = await self.queued_interfaces.get()
def boot_interface(self, interface):
async def job():
try:
await self.queue_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION], interface)
if not await interface.send_request():
print("interface did not work")
self.connection_down(interface.server)
continue
gathered = asyncio.gather(self.make_ping_job(interface), self.make_send_requests_job(interface), self.make_process_responses_job(interface))
interface.jobs = asyncio.ensure_future(gathered)
def cb(fut):
fut.exception()
try:
for i in fut.result(): assert i is None
except CancelledError:
pass
if not future.done(): future.set_result("Network job done")
interface.jobs.add_done_callback(cb)
self.connecting.remove(interface.server)
return
self.connecting.remove(interface.server)
self.interfaces[interface.server] = interface
await self.queue_request('blockchain.headers.subscribe', [], interface)
if interface.server == self.default_server:
await self.switch_to_interface(interface.server)
interface.jobs = asyncio.ensure_future(asyncio.gather(self.make_ping_job(interface), self.make_send_requests_job(interface), self.make_process_responses_job(interface)))
def cb(fut):
try:
fut.exception()
except:
pass
interface.jobs.add_done_callback(cb)
#self.notify('interfaces')
except BaseException as e:
traceback.print_exc()
print("FATAL ERROR in network_job")
if not future.done(): future.set_exception(e)
except GeneratorExit:
pass
except BaseException as e:
traceback.print_exc()
print("FATAL ERROR in start_interface")
raise e
interface.boot_job = asyncio.ensure_future(job())
def make_ping_job(self, interface):
async def job():
@ -965,17 +963,16 @@ class Network(util.DaemonThread):
# Send pings and shut down stale interfaces
# must use copy of values
if interface.has_timed_out():
print("timed out")
print(interface.server, "timed out")
self.connection_down(interface.server)
elif interface.ping_required():
print("ping required")
params = [ELECTRUM_VERSION, PROTOCOL_VERSION]
await self.queue_request('server.version', params, interface)
except CancelledError:
pass
except:
traceback.print_exc()
print("FATAL ERRROR in ping_job")
print("FATAL ERROR in ping_job")
return asyncio.ensure_future(job())
async def maintain_interfaces(self):
@ -1011,18 +1008,23 @@ class Network(util.DaemonThread):
self.loop = loop # so we store it in the instance too
self.init_headers_file()
self.pending_sends = asyncio.Queue()
self.queued_interfaces = asyncio.Queue()
if not self.network_job:
network_job_future = asyncio.Future()
self.network_job = asyncio.ensure_future(self.make_network_job(network_job_future))
self.process_pending_sends_job = self.make_process_pending_sends_job()
async def job():
try:
await self.start_network(deserialize_server(self.default_server)[2],
deserialize_proxy(self.config.get('proxy')))
except:
traceback.print_exc()
print("Previous exception in start_network")
raise
asyncio.ensure_future(job())
run_future = asyncio.Future()
asyncio.ensure_future(self.run_async(run_future))
combined_task = asyncio.gather(network_job_future, run_future)
loop.run_until_complete(combined_task)
combined_task.exception()
self.print_error("combined task result", combined_task.result())
loop.run_until_complete(run_future)
run_future.exception()
self.print_error("run future result", run_future.result())
loop.close()
async def run_async(self, future):

View File

@ -1,3 +1,4 @@
import traceback
import ssl
from asyncio.sslproto import SSLProtocol
import aiosocks
@ -22,15 +23,18 @@ class AppProto(asyncio.Protocol):
def makeProtocolFactory(receivedQueue, connUpLock, ca_certs):
class MySSLProtocol(SSLProtocol):
def connection_lost(self, data):
print("conn lost")
super().connection_lost(data)
def _on_handshake_complete(self, handshake_exc):
super()._on_handshake_complete(handshake_exc)
if handshake_exc is not None:
print("handshake complete", handshake_exc)
print("cert length", len(self._sslpipe.ssl_object.getpeercert(True)))
try:
print("cert length", len(self._sslpipe.ssl_object.getpeercert(True)))
except ValueError as e:
assert str(e) == "handshake not done yet", e
print("exception was from on_handshake_complete") # TODO how can this happen? Handshake should be done if callback is called
def __init__(self):
context = interface.get_ssl_context(cert_reqs=ssl.CERT_REQUIRED if ca_certs is not None else ssl.CERT_NONE, ca_certs=ca_certs)
context = interface.get_ssl_context(cert_reqs=ssl.CERT_REQUIRED if ca_certs is None else ssl.CERT_NONE, ca_certs=ca_certs)
proto = AppProto(receivedQueue, connUpLock)
super().__init__(asyncio.get_event_loop(), proto, context, None)
return MySSLProtocol