asyncio: remove unnecessary code, make interface pass errors to network
This commit is contained in:
parent
072058e93c
commit
a2baeeeff2
@ -68,8 +68,10 @@ class Interface(util.PrintError):
|
|||||||
- Member variable server.
|
- Member variable server.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, server, config_path, proxy_config, is_running):
|
def __init__(self, server, config_path, proxy_config, is_running, exception_handler):
|
||||||
self.is_running = is_running
|
self.error_future = asyncio.Future()
|
||||||
|
self.error_future.add_done_callback(exception_handler)
|
||||||
|
self.is_running = lambda: is_running() and not self.error_future.done()
|
||||||
self.addr = self.auth = None
|
self.addr = self.auth = None
|
||||||
if proxy_config is not None:
|
if proxy_config is not None:
|
||||||
if proxy_config["mode"] == "socks5":
|
if proxy_config["mode"] == "socks5":
|
||||||
@ -145,6 +147,11 @@ class Interface(util.PrintError):
|
|||||||
reader, writer = await asyncio.wait_for(self.conn_coro(context), 3)
|
reader, writer = await asyncio.wait_for(self.conn_coro(context), 3)
|
||||||
dercert = writer.get_extra_info('ssl_object').getpeercert(True)
|
dercert = writer.get_extra_info('ssl_object').getpeercert(True)
|
||||||
writer.close()
|
writer.close()
|
||||||
|
except OSError as e: # not ConnectionError because we need socket.gaierror too
|
||||||
|
if self.is_running():
|
||||||
|
self.print_error(self.server, "Exception in _save_certificate", type(e))
|
||||||
|
if not self.error_future.done(): self.error_future.set_result(e)
|
||||||
|
return
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
return
|
return
|
||||||
assert dercert
|
assert dercert
|
||||||
@ -296,6 +303,7 @@ class Interface(util.PrintError):
|
|||||||
|
|
||||||
def has_timed_out(self):
|
def has_timed_out(self):
|
||||||
'''Returns True if the interface has timed out.'''
|
'''Returns True if the interface has timed out.'''
|
||||||
|
if self.error_future.done(): return True
|
||||||
if (self.unanswered_requests and time.time() - self.request_time > 10
|
if (self.unanswered_requests and time.time() - self.request_time > 10
|
||||||
and self.idle_time() > 10):
|
and self.idle_time() > 10):
|
||||||
self.print_error("timeout", len(self.unanswered_requests))
|
self.print_error("timeout", len(self.unanswered_requests))
|
||||||
|
|||||||
112
lib/network.py
112
lib/network.py
@ -167,7 +167,6 @@ class Network(util.DaemonThread):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, config=None):
|
def __init__(self, config=None):
|
||||||
self.send_requests_jobs = {}
|
|
||||||
self.disconnected_servers = {}
|
self.disconnected_servers = {}
|
||||||
self.connecting = set()
|
self.connecting = set()
|
||||||
self.stopped = True
|
self.stopped = True
|
||||||
@ -419,27 +418,14 @@ class Network(util.DaemonThread):
|
|||||||
self.print_error("stopping network")
|
self.print_error("stopping network")
|
||||||
async def stop(interface):
|
async def stop(interface):
|
||||||
await self.connection_down(interface.server, "stopping network")
|
await self.connection_down(interface.server, "stopping network")
|
||||||
await asyncio.wait_for(asyncio.shield(interface.future), 3)
|
await interface.future
|
||||||
stopped_this_time = set()
|
stopped_this_time = set()
|
||||||
while self.interfaces:
|
while self.interfaces:
|
||||||
do_next = next(iter(self.interfaces.values()))
|
do_next = next(iter(self.interfaces.values()))
|
||||||
assert do_next not in stopped_this_time
|
|
||||||
for i in self.disconnected_servers:
|
|
||||||
assert i not in self.interfaces.keys()
|
|
||||||
assert i != do_next.server
|
|
||||||
stopped_this_time.add(do_next)
|
stopped_this_time.add(do_next)
|
||||||
await stop(do_next)
|
await stop(do_next)
|
||||||
if self.interface:
|
self.process_pending_sends_job.cancel()
|
||||||
assert self.interface.server in stopped_this_time, self.interface.server
|
await self.process_pending_sends_job
|
||||||
await asyncio.wait_for(asyncio.shield(self.process_pending_sends_job), 5)
|
|
||||||
assert self.interface is None
|
|
||||||
for i in range(100):
|
|
||||||
if not self.interfaces:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
await asyncio.sleep(0.1)
|
|
||||||
if self.interfaces:
|
|
||||||
assert False, "interfaces not empty after waiting: " + repr(self.interfaces)
|
|
||||||
|
|
||||||
# called from the Qt thread
|
# called from the Qt thread
|
||||||
def set_parameters(self, host, port, protocol, proxy, auto_connect):
|
def set_parameters(self, host, port, protocol, proxy, auto_connect):
|
||||||
@ -516,8 +502,8 @@ class Network(util.DaemonThread):
|
|||||||
await self.start_interface(server)
|
await self.start_interface(server)
|
||||||
return
|
return
|
||||||
i = self.interfaces[server]
|
i = self.interfaces[server]
|
||||||
if self.interface != i:
|
if self.interface != i and self.num_server != 0:
|
||||||
self.print_error("switching to", server)
|
self.print_error("switching to", server, "from", self.interface.server if self.interface else "None")
|
||||||
# stop any current interface in order to terminate subscriptions
|
# stop any current interface in order to terminate subscriptions
|
||||||
# fixme: we don't want to close headers sub
|
# fixme: we don't want to close headers sub
|
||||||
#self.close_interface(self.interface)
|
#self.close_interface(self.interface)
|
||||||
@ -535,12 +521,7 @@ class Network(util.DaemonThread):
|
|||||||
self.interface = None
|
self.interface = None
|
||||||
if interface.jobs:
|
if interface.jobs:
|
||||||
await asyncio.gather(*interface.jobs)
|
await asyncio.gather(*interface.jobs)
|
||||||
assert interface.boot_job
|
await interface.boot_job
|
||||||
try:
|
|
||||||
await asyncio.wait_for(asyncio.shield(interface.boot_job), 6) # longer than any timeout while connecting
|
|
||||||
except TimeoutError:
|
|
||||||
self.print_error("taking too long", interface.server)
|
|
||||||
raise
|
|
||||||
interface.close()
|
interface.close()
|
||||||
|
|
||||||
def add_recent_server(self, server):
|
def add_recent_server(self, server):
|
||||||
@ -738,11 +719,11 @@ class Network(util.DaemonThread):
|
|||||||
raise Exception("already disconnected " + server + " because " + repr(self.disconnected_servers[server]) + ". new reason: " + repr(reason))
|
raise Exception("already disconnected " + server + " because " + repr(self.disconnected_servers[server]) + ". new reason: " + repr(reason))
|
||||||
except:
|
except:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
sys.exit(1)
|
|
||||||
return
|
return
|
||||||
self.print_error("connection down", server)
|
self.print_error("connection down", server, reason)
|
||||||
assert not self.send_requests_jobs[server].cancelled()
|
for i in self.interfaces[server].jobs:
|
||||||
self.send_requests_jobs[server].cancel()
|
assert not i.cancelled()
|
||||||
|
i.cancel()
|
||||||
self.disconnected_servers[server] = reason
|
self.disconnected_servers[server] = reason
|
||||||
if server == self.default_server:
|
if server == self.default_server:
|
||||||
self.set_status('disconnected')
|
self.set_status('disconnected')
|
||||||
@ -756,7 +737,13 @@ class Network(util.DaemonThread):
|
|||||||
def new_interface(self, server):
|
def new_interface(self, server):
|
||||||
# todo: get tip first, then decide which checkpoint to use.
|
# todo: get tip first, then decide which checkpoint to use.
|
||||||
self.add_recent_server(server)
|
self.add_recent_server(server)
|
||||||
interface = Interface(server, self.config.path, self.proxy, lambda: not self.stopped and server in self.interfaces)
|
def interface_exception_handler(exception_future):
|
||||||
|
try:
|
||||||
|
raise exception_future.result()
|
||||||
|
except Exception as e:
|
||||||
|
print(type(e), " handled in new_interface")
|
||||||
|
asyncio.ensure_future(self.connection_down(server, "error in interface"))
|
||||||
|
interface = Interface(server, self.config.path, self.proxy, lambda: not self.stopped and server in self.interfaces, interface_exception_handler)
|
||||||
interface.future = asyncio.Future()
|
interface.future = asyncio.Future()
|
||||||
interface.blockchain = None
|
interface.blockchain = None
|
||||||
interface.tip_header = None
|
interface.tip_header = None
|
||||||
@ -937,28 +924,28 @@ class Network(util.DaemonThread):
|
|||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
await interface.send_request()
|
await interface.send_request()
|
||||||
except:
|
except CancelledError:
|
||||||
if interface.is_running():
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
print("send requests exp", str(e))
|
||||||
|
if interface.is_running() or self.num_server == 0:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
self.print_error("FATAL ERROR ^^^")
|
self.print_error("FATAL ERROR ^^^")
|
||||||
|
await self.connection_down(interface.server, "exp while send_request")
|
||||||
|
|
||||||
job = asyncio.ensure_future(job())
|
return asyncio.ensure_future(job())
|
||||||
|
|
||||||
if interface.server in self.send_requests_jobs: assert self.send_requests_jobs[interface.server].done()
|
|
||||||
self.send_requests_jobs[interface.server] = job
|
|
||||||
return job
|
|
||||||
|
|
||||||
def make_process_responses_job(self, interface):
|
def make_process_responses_job(self, interface):
|
||||||
async def job():
|
async def job():
|
||||||
try:
|
try:
|
||||||
await self.process_responses(interface)
|
await self.process_responses(interface)
|
||||||
except GeneratorExit:
|
|
||||||
pass
|
|
||||||
except OSError:
|
except OSError:
|
||||||
await self.connection_down(interface.server, "OSError in process_responses")
|
await self.connection_down(interface.server, "OSError in process_responses")
|
||||||
self.print_error("OS error, connection downed")
|
self.print_error("OS error, connection downed")
|
||||||
except BaseException:
|
except CancelledError:
|
||||||
if interface.is_running():
|
pass
|
||||||
|
except:
|
||||||
|
if interface.is_running() or self.num_server == 0:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
self.print_error("FATAL ERROR in process_responses")
|
self.print_error("FATAL ERROR in process_responses")
|
||||||
return asyncio.ensure_future(job())
|
return asyncio.ensure_future(job())
|
||||||
@ -966,15 +953,12 @@ class Network(util.DaemonThread):
|
|||||||
def make_process_pending_sends_job(self):
|
def make_process_pending_sends_job(self):
|
||||||
async def job():
|
async def job():
|
||||||
try:
|
try:
|
||||||
while not self.stopped:
|
while True:
|
||||||
try:
|
await self.process_pending_sends()
|
||||||
await asyncio.wait_for(asyncio.shield(self.process_pending_sends()), 1)
|
except CancelledError:
|
||||||
except TimeoutError:
|
pass
|
||||||
continue
|
except:
|
||||||
#except CancelledError:
|
if not self.stopped or self.num_server == 0:
|
||||||
# pass
|
|
||||||
except BaseException as e:
|
|
||||||
if not self.stopped:
|
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
self.print_error("FATAL ERROR in process_pending_sends")
|
self.print_error("FATAL ERROR in process_pending_sends")
|
||||||
return asyncio.ensure_future(job())
|
return asyncio.ensure_future(job())
|
||||||
@ -994,13 +978,13 @@ class Network(util.DaemonThread):
|
|||||||
def boot_interface(self, interface):
|
def boot_interface(self, interface):
|
||||||
async def job():
|
async def job():
|
||||||
try:
|
try:
|
||||||
send_requests_job = self.make_send_requests_job(interface)
|
interface.jobs = [self.make_send_requests_job(interface)] # we need this job to process the request queued below
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(self.queue_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION], interface), 1)
|
await asyncio.wait_for(self.queue_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION], interface), 1)
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
if interface.is_running():
|
asyncio.ensure_future(self.connection_down(interface.server, "couldn't send initial version"))
|
||||||
asyncio.ensure_future(self.connection_down(interface.server, "send_request timeout in boot_interface"))
|
interface.future.set_result("couldn't send initial version")
|
||||||
interface.future.set_result("could not send request")
|
|
||||||
return
|
return
|
||||||
if not interface.is_running():
|
if not interface.is_running():
|
||||||
interface.future.set_result("stopped after sending request")
|
interface.future.set_result("stopped after sending request")
|
||||||
@ -1008,8 +992,7 @@ class Network(util.DaemonThread):
|
|||||||
try:
|
try:
|
||||||
await asyncio.wait_for(interface.get_response(), 1)
|
await asyncio.wait_for(interface.get_response(), 1)
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
if interface.is_running():
|
asyncio.ensure_future(self.connection_down(interface.server, "timeout in boot_interface while getting response"))
|
||||||
asyncio.ensure_future(self.connection_down(interface.server, "timeout in boot_interface while getting response"))
|
|
||||||
interface.future.set_result("timeout while getting response")
|
interface.future.set_result("timeout while getting response")
|
||||||
return
|
return
|
||||||
if not interface.is_running():
|
if not interface.is_running():
|
||||||
@ -1019,7 +1002,7 @@ class Network(util.DaemonThread):
|
|||||||
await self.queue_request('blockchain.headers.subscribe', [], interface)
|
await self.queue_request('blockchain.headers.subscribe', [], interface)
|
||||||
if interface.server == self.default_server:
|
if interface.server == self.default_server:
|
||||||
await asyncio.wait_for(self.switch_to_interface(interface.server), 1)
|
await asyncio.wait_for(self.switch_to_interface(interface.server), 1)
|
||||||
interface.jobs = [send_requests_job, self.make_process_responses_job(interface)]
|
interface.jobs += [self.make_process_responses_job(interface)]
|
||||||
gathered = asyncio.gather(*interface.jobs)
|
gathered = asyncio.gather(*interface.jobs)
|
||||||
while interface.is_running():
|
while interface.is_running():
|
||||||
try:
|
try:
|
||||||
@ -1029,17 +1012,15 @@ class Network(util.DaemonThread):
|
|||||||
if interface.has_timed_out():
|
if interface.has_timed_out():
|
||||||
self.print_error(interface.server, "timed out")
|
self.print_error(interface.server, "timed out")
|
||||||
await self.connection_down(interface.server, "time out in ping_job")
|
await self.connection_down(interface.server, "time out in ping_job")
|
||||||
|
break
|
||||||
elif interface.ping_required():
|
elif interface.ping_required():
|
||||||
params = [ELECTRUM_VERSION, PROTOCOL_VERSION]
|
params = [ELECTRUM_VERSION, PROTOCOL_VERSION]
|
||||||
await self.queue_request('server.version', params, interface)
|
await self.queue_request('server.version', params, interface)
|
||||||
interface.future.set_result("finished")
|
interface.future.set_result("finished")
|
||||||
return
|
return
|
||||||
#self.notify('interfaces')
|
#self.notify('interfaces')
|
||||||
except GeneratorExit:
|
|
||||||
self.print_error(interface.server, "GENERATOR EXIT")
|
|
||||||
pass
|
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
if interface.is_running():
|
if interface.is_running() or self.num_server == 0:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
self.print_error("FATAL ERROR in boot_interface")
|
self.print_error("FATAL ERROR in boot_interface")
|
||||||
raise e
|
raise e
|
||||||
@ -1122,15 +1103,6 @@ class Network(util.DaemonThread):
|
|||||||
self.run_jobs()
|
self.run_jobs()
|
||||||
await self.stop_network()
|
await self.stop_network()
|
||||||
self.on_stop()
|
self.on_stop()
|
||||||
for i in asyncio.Task.all_tasks():
|
|
||||||
if asyncio.Task.current_task() == i: continue
|
|
||||||
try:
|
|
||||||
await asyncio.wait_for(asyncio.shield(i), 2)
|
|
||||||
except TimeoutError:
|
|
||||||
self.print_error("TOO SLOW TO SHUT DOWN, CANCELLING", i)
|
|
||||||
i.cancel()
|
|
||||||
except CancelledError:
|
|
||||||
pass
|
|
||||||
future.set_result("run_async done")
|
future.set_result("run_async done")
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
future.set_exception(e)
|
future.set_exception(e)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user