diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index 0079efc..5367bb4 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -347,7 +347,7 @@ class PeerManager(object): else: self.logger.info('no proxy detected') - async def _peer_discovery_loop(self): + async def _discover_peers(self): '''Main loop performing peer maintenance. This includes 1) Forgetting unreachable peers. @@ -386,57 +386,46 @@ class PeerManager(object): # Retry a failed connection if enough time has passed return peer.last_try < now - WAKEUP_SECS * 2 ** peer.try_count - peers = [peer for peer in self.peers if should_retry(peer)] + for peer in self.peers: + if should_retry(peer): + self.tasks.create_task(self._retry_peer(peer)) - for peer in peers: - peer.try_count += 1 - pairs = peer.connection_port_pairs() - if peer.bad or not pairs: - self._maybe_forget_peer(peer) - else: - self._retry_peer(peer, pairs) + async def _retry_peer(self, peer): + peer.try_count += 1 - def _retry_peer(self, peer, port_pairs): - peer.last_try = time.time() + for kind, port in peer.connection_port_pairs(): + peer.last_try = time.time() - kwargs = {'loop': self.loop} + kwargs = {} + if kind == 'SSL': + kwargs['ssl'] = ssl.SSLContext(ssl.PROTOCOL_TLS) - kind, port = port_pairs[0] - if kind == 'SSL': - kwargs['ssl'] = ssl.SSLContext(ssl.PROTOCOL_TLS) + host = self.env.cs_host(for_rpc=False) + if isinstance(host, list): + host = host[0] - host = self.env.cs_host(for_rpc=False) - if isinstance(host, list): - host = host[0] + if self.env.force_proxy or peer.is_tor: + if not self.proxy: + return + kwargs['proxy'] = self.proxy + kwargs['resolve'] = not peer.is_tor + elif host: + # Use our listening Host/IP for outgoing non-proxy + # connections so our peers see the correct source. + kwargs['local_addr'] = (host, None) - if self.env.force_proxy or peer.is_tor: - if not self.proxy: + session = PeerSession(peer, self, kind, peer.host, port, **kwargs) + try: + await session.create_connection() return - kwargs['proxy'] = self.proxy - kwargs['resolve'] = not peer.is_tor - elif host: - # Use our listening Host/IP for outgoing non-proxy - # connections so our peers see the correct source. - kwargs['local_addr'] = (host, None) + except Exception as e: + elapsed = time.time() - peer.last_try + self.logger.info(f'failed connecting to {peer} at {kind} port ' + f'{port} in {elapsed:.1f}s: {e}') + # Try the next port pair + continue - session = PeerSession(peer, self, kind, peer.host, port, **kwargs) - callback = partial(self._on_connected, peer, port_pairs) - self.tasks.create_task(session.create_connection(), callback) - - def _on_connected(self, peer, port_pairs, task): - '''Called when a connection attempt succeeds or fails. - - If failed, close the session, log it and try remaining port pairs. - ''' - if not task.cancelled() and task.exception(): - kind, port = port_pairs.pop(0) - elapsed = time.time() - peer.last_try - self.logger.info(f'failed connecting to {peer} at {kind} port ' - f'{port} in {elapsed:.1f}s: {task.exception()}') - if port_pairs: - self._retry_peer(peer, port_pairs) - else: - self._maybe_forget_peer(peer) + self._maybe_forget_peer(peer) def _set_verification_status(self, peer, kind, good): '''Called when a verification succeeded or failed.''' @@ -478,8 +467,6 @@ class PeerManager(object): self.logger.info('forgetting {} peer: {}'.format(desc, peer)) self.peers.discard(peer) - return forget - # # External interface # @@ -487,7 +474,7 @@ class PeerManager(object): if self.env.peer_discovery == self.env.PD_ON: self.logger.info(f'beginning peer discovery. Force use of ' f'proxy: {self.env.force_proxy}') - self.tasks.create_task(self._peer_discovery_loop()) + self.tasks.create_task(self._discover_peers()) else: self.logger.info('peer discovery is disabled')