asyncio: use is_running of interface instead of global stop flag
This commit is contained in:
parent
6154e93222
commit
b427f55caf
@ -595,7 +595,7 @@ class Network(util.DaemonThread):
|
||||
return str(method) + (':' + str(params[0]) if params else '')
|
||||
|
||||
async def process_responses(self, interface):
|
||||
while not self.stopped:
|
||||
while interface.is_running():
|
||||
request, response = await interface.get_response()
|
||||
if request:
|
||||
method, params, message_id = request
|
||||
@ -621,7 +621,7 @@ class Network(util.DaemonThread):
|
||||
self.subscribed_addresses.add(params[0])
|
||||
else:
|
||||
if not response: # Closed remotely / misbehaving
|
||||
if not self.stopped: await self.connection_down(interface.server, "no response in process responses")
|
||||
if interface.is_running(): await self.connection_down(interface.server, "no response in process responses")
|
||||
return
|
||||
# Rewrite response shape to match subscription request response
|
||||
method = response.get('method')
|
||||
@ -740,7 +740,7 @@ class Network(util.DaemonThread):
|
||||
async def new_interface(self, server):
|
||||
# todo: get tip first, then decide which checkpoint to use.
|
||||
self.add_recent_server(server)
|
||||
interface = Interface(server, self.config.path, self.proxy, lambda: not self.stopped)
|
||||
interface = Interface(server, self.config.path, self.proxy, lambda: not self.stopped and server not in self.interfaces)
|
||||
interface.future = asyncio.Future()
|
||||
interface.blockchain = None
|
||||
interface.tip_header = None
|
||||
@ -924,17 +924,17 @@ class Network(util.DaemonThread):
|
||||
def make_send_requests_job(self, interface):
|
||||
async def job():
|
||||
try:
|
||||
while not self.stopped:
|
||||
while interface.is_running():
|
||||
try:
|
||||
result = await asyncio.wait_for(asyncio.shield(interface.send_request()), 1)
|
||||
except TimeoutError:
|
||||
continue
|
||||
if not result and not self.stopped:
|
||||
if not result and interface.is_running():
|
||||
await self.connection_down(interface.server, "send_request returned false")
|
||||
except GeneratorExit:
|
||||
pass
|
||||
except:
|
||||
if not self.stopped:
|
||||
if interface.is_running():
|
||||
traceback.print_exc()
|
||||
self.print_error("FATAL ERROR ^^^")
|
||||
return asyncio.ensure_future(job())
|
||||
@ -949,7 +949,7 @@ class Network(util.DaemonThread):
|
||||
await self.connection_down(interface.server, "OSError in process_responses")
|
||||
self.print_error("OS error, connection downed")
|
||||
except BaseException:
|
||||
if not self.stopped:
|
||||
if interface.is_running():
|
||||
traceback.print_exc()
|
||||
self.print_error("FATAL ERROR in process_responses")
|
||||
return asyncio.ensure_future(job())
|
||||
@ -987,21 +987,21 @@ class Network(util.DaemonThread):
|
||||
try:
|
||||
await self.queue_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION], interface)
|
||||
if not await interface.send_request():
|
||||
if not self.stopped:
|
||||
if interface.is_running():
|
||||
asyncio.ensure_future(self.connection_down(interface.server, "send_request false in boot_interface"))
|
||||
interface.future.set_result("could not send request")
|
||||
return
|
||||
if self.stopped:
|
||||
if not interface.is_running():
|
||||
interface.future.set_result("stopped after sending request")
|
||||
return
|
||||
try:
|
||||
await asyncio.wait_for(interface.get_response(), 1)
|
||||
except TimeoutError:
|
||||
if not self.stopped:
|
||||
if interface.is_running():
|
||||
asyncio.ensure_future(self.connection_down(interface.server, "timeout in boot_interface while getting response"))
|
||||
interface.future.set_result("timeout while getting response")
|
||||
return
|
||||
if self.stopped:
|
||||
if not interface.is_running():
|
||||
interface.future.set_result("stopped after getting response")
|
||||
return
|
||||
#self.interfaces[interface.server] = interface
|
||||
@ -1010,7 +1010,7 @@ class Network(util.DaemonThread):
|
||||
await asyncio.wait_for(self.switch_to_interface(interface.server), 1)
|
||||
interface.jobs = [asyncio.ensure_future(x) for x in [self.make_ping_job(interface), self.make_send_requests_job(interface), self.make_process_responses_job(interface)]]
|
||||
gathered = asyncio.gather(*interface.jobs)
|
||||
while not self.stopped:
|
||||
while interface.is_running():
|
||||
try:
|
||||
await asyncio.wait_for(asyncio.shield(gathered), 1)
|
||||
except TimeoutError:
|
||||
@ -1022,7 +1022,7 @@ class Network(util.DaemonThread):
|
||||
self.print_error(interface.server, "GENERATOR EXIT")
|
||||
pass
|
||||
except BaseException as e:
|
||||
if not self.stopped:
|
||||
if interface.is_running():
|
||||
traceback.print_exc()
|
||||
self.print_error("FATAL ERROR in boot_interface")
|
||||
raise e
|
||||
@ -1039,7 +1039,7 @@ class Network(util.DaemonThread):
|
||||
def make_ping_job(self, interface):
|
||||
async def job():
|
||||
try:
|
||||
while not self.stopped:
|
||||
while interface.is_running():
|
||||
await asyncio.sleep(1)
|
||||
# Send pings and shut down stale interfaces
|
||||
# must use copy of values
|
||||
@ -1053,7 +1053,7 @@ class Network(util.DaemonThread):
|
||||
except GeneratorExit:
|
||||
pass
|
||||
except:
|
||||
if not self.stopped:
|
||||
if interface.is_running():
|
||||
traceback.print_exc()
|
||||
self.print_error("FATAL ERROR in ping_job")
|
||||
return asyncio.ensure_future(job())
|
||||
|
||||
Loading…
Reference in New Issue
Block a user