Avoid callback in peer manager
Spawn separate tasks for each peer we test
This commit is contained in:
parent
12c024f3ac
commit
ba607544b9
@ -347,7 +347,7 @@ class PeerManager(object):
|
||||
else:
|
||||
self.logger.info('no proxy detected')
|
||||
|
||||
async def _peer_discovery_loop(self):
|
||||
async def _discover_peers(self):
|
||||
'''Main loop performing peer maintenance. This includes
|
||||
|
||||
1) Forgetting unreachable peers.
|
||||
@ -386,57 +386,46 @@ class PeerManager(object):
|
||||
# Retry a failed connection if enough time has passed
|
||||
return peer.last_try < now - WAKEUP_SECS * 2 ** peer.try_count
|
||||
|
||||
peers = [peer for peer in self.peers if should_retry(peer)]
|
||||
for peer in self.peers:
|
||||
if should_retry(peer):
|
||||
self.tasks.create_task(self._retry_peer(peer))
|
||||
|
||||
for peer in peers:
|
||||
peer.try_count += 1
|
||||
pairs = peer.connection_port_pairs()
|
||||
if peer.bad or not pairs:
|
||||
self._maybe_forget_peer(peer)
|
||||
else:
|
||||
self._retry_peer(peer, pairs)
|
||||
async def _retry_peer(self, peer):
|
||||
peer.try_count += 1
|
||||
|
||||
def _retry_peer(self, peer, port_pairs):
|
||||
peer.last_try = time.time()
|
||||
for kind, port in peer.connection_port_pairs():
|
||||
peer.last_try = time.time()
|
||||
|
||||
kwargs = {'loop': self.loop}
|
||||
kwargs = {}
|
||||
if kind == 'SSL':
|
||||
kwargs['ssl'] = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
|
||||
kind, port = port_pairs[0]
|
||||
if kind == 'SSL':
|
||||
kwargs['ssl'] = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
host = self.env.cs_host(for_rpc=False)
|
||||
if isinstance(host, list):
|
||||
host = host[0]
|
||||
|
||||
host = self.env.cs_host(for_rpc=False)
|
||||
if isinstance(host, list):
|
||||
host = host[0]
|
||||
if self.env.force_proxy or peer.is_tor:
|
||||
if not self.proxy:
|
||||
return
|
||||
kwargs['proxy'] = self.proxy
|
||||
kwargs['resolve'] = not peer.is_tor
|
||||
elif host:
|
||||
# Use our listening Host/IP for outgoing non-proxy
|
||||
# connections so our peers see the correct source.
|
||||
kwargs['local_addr'] = (host, None)
|
||||
|
||||
if self.env.force_proxy or peer.is_tor:
|
||||
if not self.proxy:
|
||||
session = PeerSession(peer, self, kind, peer.host, port, **kwargs)
|
||||
try:
|
||||
await session.create_connection()
|
||||
return
|
||||
kwargs['proxy'] = self.proxy
|
||||
kwargs['resolve'] = not peer.is_tor
|
||||
elif host:
|
||||
# Use our listening Host/IP for outgoing non-proxy
|
||||
# connections so our peers see the correct source.
|
||||
kwargs['local_addr'] = (host, None)
|
||||
except Exception as e:
|
||||
elapsed = time.time() - peer.last_try
|
||||
self.logger.info(f'failed connecting to {peer} at {kind} port '
|
||||
f'{port} in {elapsed:.1f}s: {e}')
|
||||
# Try the next port pair
|
||||
continue
|
||||
|
||||
session = PeerSession(peer, self, kind, peer.host, port, **kwargs)
|
||||
callback = partial(self._on_connected, peer, port_pairs)
|
||||
self.tasks.create_task(session.create_connection(), callback)
|
||||
|
||||
def _on_connected(self, peer, port_pairs, task):
|
||||
'''Called when a connection attempt succeeds or fails.
|
||||
|
||||
If failed, close the session, log it and try remaining port pairs.
|
||||
'''
|
||||
if not task.cancelled() and task.exception():
|
||||
kind, port = port_pairs.pop(0)
|
||||
elapsed = time.time() - peer.last_try
|
||||
self.logger.info(f'failed connecting to {peer} at {kind} port '
|
||||
f'{port} in {elapsed:.1f}s: {task.exception()}')
|
||||
if port_pairs:
|
||||
self._retry_peer(peer, port_pairs)
|
||||
else:
|
||||
self._maybe_forget_peer(peer)
|
||||
self._maybe_forget_peer(peer)
|
||||
|
||||
def _set_verification_status(self, peer, kind, good):
|
||||
'''Called when a verification succeeded or failed.'''
|
||||
@ -478,8 +467,6 @@ class PeerManager(object):
|
||||
self.logger.info('forgetting {} peer: {}'.format(desc, peer))
|
||||
self.peers.discard(peer)
|
||||
|
||||
return forget
|
||||
|
||||
#
|
||||
# External interface
|
||||
#
|
||||
@ -487,7 +474,7 @@ class PeerManager(object):
|
||||
if self.env.peer_discovery == self.env.PD_ON:
|
||||
self.logger.info(f'beginning peer discovery. Force use of '
|
||||
f'proxy: {self.env.force_proxy}')
|
||||
self.tasks.create_task(self._peer_discovery_loop())
|
||||
self.tasks.create_task(self._discover_peers())
|
||||
else:
|
||||
self.logger.info('peer discovery is disabled')
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user