asyncio: more robost network connection handling, shorter timeouts
This commit is contained in:
parent
1555100632
commit
37f1e3bd95
@ -140,12 +140,12 @@ class Interface(util.PrintError):
|
||||
if not self.is_running(): return
|
||||
transport.close()
|
||||
else:
|
||||
reader, writer = await asyncio.wait_for(self.conn_coro(context), 5)
|
||||
reader, writer = await asyncio.wait_for(self.conn_coro(context), 3)
|
||||
dercert = writer.get_extra_info('ssl_object').getpeercert(True)
|
||||
writer.close()
|
||||
except OSError as e: # not ConnectionError because we need socket.gaierror too
|
||||
if self.is_running():
|
||||
print("Exception in _save_certificate", type(e))
|
||||
print(self.server, "Exception in _save_certificate", type(e))
|
||||
return
|
||||
except TimeoutError:
|
||||
return
|
||||
|
||||
126
lib/network.py
126
lib/network.py
@ -20,6 +20,7 @@
|
||||
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
import collections
|
||||
from functools import partial
|
||||
import time
|
||||
import queue
|
||||
@ -220,8 +221,6 @@ class Network(util.DaemonThread):
|
||||
self.interface = None
|
||||
self.interfaces = {}
|
||||
self.auto_connect = self.config.get('auto_connect', True)
|
||||
self.connecting = set()
|
||||
self.proxy = None
|
||||
|
||||
def register_callback(self, callback, events):
|
||||
with self.lock:
|
||||
@ -374,15 +373,16 @@ class Network(util.DaemonThread):
|
||||
return out
|
||||
|
||||
async def start_interface(self, server):
|
||||
if (not server in self.interfaces and not server in self.connecting):
|
||||
if server == self.default_server:
|
||||
self.print_error("connecting to %s as new interface" % server)
|
||||
self.set_status('connecting')
|
||||
self.connecting.add(server)
|
||||
return await self.new_interface(server)
|
||||
assert not self.connecting[server].locked()
|
||||
async with self.connecting[server]:
|
||||
if (not server in self.interfaces):
|
||||
if server == self.default_server:
|
||||
self.print_error("connecting to %s as new interface" % server)
|
||||
self.set_status('connecting')
|
||||
return await self.new_interface(server)
|
||||
|
||||
async def start_random_interface(self):
|
||||
exclude_set = self.disconnected_servers.union(set(self.interfaces))
|
||||
exclude_set = self.disconnected_servers.union(set(self.interfaces.keys()))
|
||||
server = pick_random_server(self.get_servers(), self.protocol, exclude_set)
|
||||
if server:
|
||||
return await self.start_interface(server)
|
||||
@ -396,7 +396,7 @@ class Network(util.DaemonThread):
|
||||
async def start_network(self, protocol, proxy):
|
||||
self.stopped = False
|
||||
assert not self.interface and not self.interfaces
|
||||
assert not self.connecting
|
||||
assert all(not i.locked() for i in self.connecting.values())
|
||||
self.print_error('starting network')
|
||||
self.disconnected_servers = set([])
|
||||
self.protocol = protocol
|
||||
@ -406,13 +406,21 @@ class Network(util.DaemonThread):
|
||||
async def stop_network(self):
|
||||
self.stopped = True
|
||||
self.print_error("stopping network")
|
||||
list_with_main = ([self.interface] if self.interface else [])
|
||||
for num, interface in enumerate(list(self.interfaces.values()) + list_with_main):
|
||||
async def stop(interface):
|
||||
while True:
|
||||
try:
|
||||
await asyncio.wait_for(asyncio.shield(self.connection_down(interface.server)), 1.5)
|
||||
break
|
||||
except TimeoutError:
|
||||
print("close_interface too slow...", interface.server)
|
||||
try:
|
||||
await asyncio.wait_for(asyncio.shield(self.close_interface(interface)), 5)
|
||||
await asyncio.wait_for(asyncio.shield(interface.future), 5)
|
||||
print(interface.server, await asyncio.wait_for(asyncio.shield(interface.future), 3))
|
||||
except TimeoutError:
|
||||
print("close_interface too slow...")
|
||||
print("interface future too slow", interface.server)
|
||||
if self.interface:
|
||||
await stop(self.interface)
|
||||
while self.interfaces:
|
||||
await stop(next(iter(self.interfaces.values())))
|
||||
await asyncio.wait_for(asyncio.shield(self.process_pending_sends_job), 5)
|
||||
assert self.interface is None
|
||||
for i in range(100):
|
||||
@ -422,7 +430,8 @@ class Network(util.DaemonThread):
|
||||
await asyncio.sleep(0.1)
|
||||
if self.interfaces:
|
||||
assert False, "interfaces not empty after waiting: " + repr(self.interfaces)
|
||||
self.connecting = set()
|
||||
for i in self.connecting.values():
|
||||
assert not i.locked()
|
||||
|
||||
# called from the Qt thread
|
||||
def set_parameters(self, host, port, protocol, proxy, auto_connect):
|
||||
@ -510,13 +519,11 @@ class Network(util.DaemonThread):
|
||||
self.interfaces.pop(interface.server)
|
||||
if interface.server == self.default_server:
|
||||
self.interface = None
|
||||
while True:
|
||||
if interface.jobs:
|
||||
for i in interface.jobs:
|
||||
if not i.done():
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
break
|
||||
assert interface.boot_job.done()
|
||||
asyncio.wait_for(i, 3)
|
||||
assert interface.boot_job
|
||||
await asyncio.wait_for(asyncio.shield(interface.boot_job), 3)
|
||||
interface.close()
|
||||
|
||||
def add_recent_server(self, server):
|
||||
@ -699,16 +706,20 @@ class Network(util.DaemonThread):
|
||||
async def connection_down(self, server):
|
||||
'''A connection to server either went down, or was never made.
|
||||
We distinguish by whether it is in self.interfaces.'''
|
||||
self.print_error("connection down", server)
|
||||
self.disconnected_servers.add(server)
|
||||
if server == self.default_server:
|
||||
self.set_status('disconnected')
|
||||
if server in self.interfaces:
|
||||
await self.close_interface(self.interfaces[server])
|
||||
self.notify('interfaces')
|
||||
for b in self.blockchains.values():
|
||||
if b.catch_up == server:
|
||||
b.catch_up = None
|
||||
async with self.connecting[server]:
|
||||
if server in self.disconnected_servers:
|
||||
self.print_error("already disconnected", server)
|
||||
return
|
||||
self.print_error("connection down", server)
|
||||
self.disconnected_servers.add(server)
|
||||
if server == self.default_server:
|
||||
self.set_status('disconnected')
|
||||
if server in self.interfaces:
|
||||
await self.close_interface(self.interfaces[server])
|
||||
self.notify('interfaces')
|
||||
for b in self.blockchains.values():
|
||||
if b.catch_up == server:
|
||||
b.catch_up = None
|
||||
|
||||
async def new_interface(self, server):
|
||||
# todo: get tip first, then decide which checkpoint to use.
|
||||
@ -723,7 +734,8 @@ class Network(util.DaemonThread):
|
||||
interface.jobs = None
|
||||
interface.boot_job = None
|
||||
self.boot_interface(interface)
|
||||
#self.interfaces[server] = interface
|
||||
assert server not in self.interfaces
|
||||
self.interfaces[server] = interface
|
||||
return interface
|
||||
|
||||
async def request_chunk(self, interface, idx):
|
||||
@ -961,26 +973,28 @@ class Network(util.DaemonThread):
|
||||
async def job():
|
||||
try:
|
||||
await self.queue_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION], interface)
|
||||
def rem():
|
||||
if not self.stopped:
|
||||
self.connecting.remove(interface.server)
|
||||
if not await interface.send_request():
|
||||
rem()
|
||||
await self.connection_down(interface.server)
|
||||
asyncio.ensure_future(self.connection_down(interface.server))
|
||||
interface.future.set_result("could not send request")
|
||||
return
|
||||
if self.stopped:
|
||||
asyncio.ensure_future(self.connection_down(interface.server))
|
||||
interface.future.set_result("stopped after sending request")
|
||||
return
|
||||
if self.stopped: return
|
||||
try:
|
||||
await asyncio.wait_for(interface.get_response(), 1)
|
||||
except TimeoutError:
|
||||
rem()
|
||||
await self.connection_down(interface.server)
|
||||
asyncio.ensure_future(self.connection_down(interface.server))
|
||||
interface.future.set_result("timeout while getting response")
|
||||
return
|
||||
if self.stopped: return
|
||||
rem()
|
||||
self.interfaces[interface.server] = interface
|
||||
if self.stopped:
|
||||
asyncio.ensure_future(self.connection_down(interface.server))
|
||||
interface.future.set_result("stopped after getting response")
|
||||
return
|
||||
#self.interfaces[interface.server] = interface
|
||||
await self.queue_request('blockchain.headers.subscribe', [], interface)
|
||||
if interface.server == self.default_server:
|
||||
await self.switch_to_interface(interface.server)
|
||||
await asyncio.wait_for(self.switch_to_interface(interface.server), 1)
|
||||
interface.jobs = [asyncio.ensure_future(x) for x in [self.make_ping_job(interface), self.make_send_requests_job(interface), self.make_process_responses_job(interface)]]
|
||||
def cb(num, fut):
|
||||
try:
|
||||
@ -991,6 +1005,8 @@ class Network(util.DaemonThread):
|
||||
if not interface.future.done(): interface.future.set_result(str(num) + " done")
|
||||
for num, i in enumerate(interface.jobs):
|
||||
i.add_done_callback(partial(cb, num))
|
||||
interface.future.set_result("finished")
|
||||
return
|
||||
#self.notify('interfaces')
|
||||
except GeneratorExit:
|
||||
print(interface.server, "GENERATOR EXIT")
|
||||
@ -1037,7 +1053,7 @@ class Network(util.DaemonThread):
|
||||
|
||||
now = time.time()
|
||||
# nodes
|
||||
if len(self.interfaces) + len(self.connecting) < self.num_server:
|
||||
if len(self.interfaces) + sum((1 if x.locked() else 0) for x in self.connecting.values()) < self.num_server:
|
||||
await self.start_random_interface()
|
||||
if now - self.nodes_retry_time > NODES_RETRY_INTERVAL:
|
||||
self.print_error('network: retrying connections')
|
||||
@ -1050,12 +1066,13 @@ class Network(util.DaemonThread):
|
||||
if not self.is_connecting():
|
||||
await self.switch_to_random_interface()
|
||||
else:
|
||||
if self.default_server in self.disconnected_servers:
|
||||
if now - self.server_retry_time > SERVER_RETRY_INTERVAL:
|
||||
self.disconnected_servers.remove(self.default_server)
|
||||
self.server_retry_time = now
|
||||
else:
|
||||
await self.switch_to_interface(self.default_server)
|
||||
async with self.connecting[self.default_server]:
|
||||
if self.default_server in self.disconnected_servers:
|
||||
if now - self.server_retry_time > SERVER_RETRY_INTERVAL:
|
||||
self.disconnected_servers.remove(self.default_server)
|
||||
self.server_retry_time = now
|
||||
else:
|
||||
await self.switch_to_interface(self.default_server)
|
||||
else:
|
||||
if self.config.is_fee_estimates_update_required():
|
||||
await self.request_fee_estimates()
|
||||
@ -1067,6 +1084,7 @@ class Network(util.DaemonThread):
|
||||
self.loop = loop # so we store it in the instance too
|
||||
self.init_headers_file()
|
||||
self.pending_sends = asyncio.Queue()
|
||||
self.connecting = collections.defaultdict(asyncio.Lock)
|
||||
|
||||
async def job():
|
||||
try:
|
||||
@ -1099,7 +1117,7 @@ class Network(util.DaemonThread):
|
||||
for i in asyncio.Task.all_tasks():
|
||||
if asyncio.Task.current_task() == i: continue
|
||||
try:
|
||||
await asyncio.wait_for(asyncio.shield(i), 5)
|
||||
await asyncio.wait_for(asyncio.shield(i), 2)
|
||||
except TimeoutError:
|
||||
print("TOO SLOW TO SHUT DOWN, CANCELLING", i)
|
||||
i.cancel()
|
||||
@ -1114,7 +1132,7 @@ class Network(util.DaemonThread):
|
||||
if not height:
|
||||
return
|
||||
if height < self.max_checkpoint():
|
||||
await self.connection_down(interface)
|
||||
await self.connection_down(interface.server)
|
||||
return
|
||||
interface.tip_header = header
|
||||
interface.tip = height
|
||||
|
||||
Loading…
Reference in New Issue
Block a user