From 79b98b2b544a98a35d8bd94f824a45267a393021 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 24 Jul 2018 13:36:48 +0800 Subject: [PATCH] Distinguish private and public methods of peer mgr Give private methods a leading _ Remove dead code --- electrumx/server/peers.py | 438 +++++++++++++++++++------------------- 1 file changed, 218 insertions(+), 220 deletions(-) diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index e356d70..2ffdd77 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -89,13 +89,13 @@ class PeerSession(ClientSession): def fail(self, request, reason): self.logger.error(f'{request} failed: {reason}') - self.peer_mgr.set_verification_status(self.peer, self.kind, False) + self.peer_mgr._set_verification_status(self.peer, self.kind, False) self.close() def bad(self, reason): self.logger.error(f'marking bad: {reason}') self.peer.mark_bad() - self.peer_mgr.set_verification_status(self.peer, self.kind, False) + self.peer_mgr._set_verification_status(self.peer, self.kind, False) self.close() def on_version(self, request): @@ -212,7 +212,7 @@ class PeerSession(ClientSession): self.bad('bad server.peers.subscribe response') return - features = self.peer_mgr.features_to_register(self.peer, peers) + features = self.peer_mgr._features_to_register(self.peer, peers) if features: self.logger.info(f'registering ourself with "server.add_peer"') self.send_request('server.add_peer', [features], @@ -231,7 +231,7 @@ class PeerSession(ClientSession): ''' if not self.all_requests(): self.close() - self.peer_mgr.set_verification_status(self.peer, self.kind, True) + self.peer_mgr._set_verification_status(self.peer, self.kind, True) class PeerManager(object): @@ -264,24 +264,12 @@ class PeerManager(object): self.proxy = None self.last_proxy_try = 0 - def my_clearnet_peer(self): + def _my_clearnet_peer(self): '''Returns the clearnet peer representing this server, if any.''' clearnet = [peer for peer in self.myselves if not peer.is_tor] return clearnet[0] if clearnet else None - def info(self): - '''The number of peers.''' - self.set_peer_statuses() - counter = Counter(peer.status for peer in self.peers) - return { - 'bad': counter[PEER_BAD], - 'good': counter[PEER_GOOD], - 'never': counter[PEER_NEVER], - 'stale': counter[PEER_STALE], - 'total': len(self.peers), - } - - def set_peer_statuses(self): + def _set_peer_statuses(self): '''Set peer statuses.''' cutoff = time.time() - STALE_SECS for peer in self.peers: @@ -294,22 +282,7 @@ class PeerManager(object): else: peer.status = PEER_NEVER - def rpc_data(self): - '''Peer data for the peers RPC method.''' - self.set_peer_statuses() - descs = ['good', 'stale', 'never', 'bad'] - - def peer_data(peer): - data = peer.serialize() - data['status'] = descs[peer.status] - return data - - def peer_key(peer): - return (peer.bad, -peer.last_good) - - return [peer_data(peer) for peer in sorted(self.peers, key=peer_key)] - - def features_to_register(self, peer, remote_peers): + def _features_to_register(self, peer, remote_peers): '''If we should register ourselves to the remote peer, which has reported the given list of known peers, return the clearnet identity features to register, otherwise None. @@ -320,7 +293,7 @@ class PeerManager(object): # are a non-public IP address, or to ourselves. if not self.env.peer_announce or peer in self.myselves: return None - my = self.my_clearnet_peer() + my = self._my_clearnet_peer() if not my or not my.is_public: return None # Register if no matches, or ports have changed @@ -329,6 +302,193 @@ class PeerManager(object): return None return my.features + def _permit_new_onion_peer(self): + '''Accept a new onion peer only once per random time interval.''' + now = time.time() + if now < self.permit_onion_peer_time: + return False + self.permit_onion_peer_time = now + random.randrange(0, 1200) + return True + + def _import_peers(self): + '''Import hard-coded peers from a file or the coin defaults.''' + self.add_peers(self.myselves) + + # Add the hard-coded ones unless only returning self + if self.env.peer_discovery != self.env.PD_SELF: + coin_peers = self.env.coin.PEERS + peers = [Peer.from_real_name(real_name, 'coins.py') + for real_name in coin_peers] + self.add_peers(peers, limit=None) + + async def _maybe_detect_proxy(self): + '''Detect a proxy if we don't have one and some time has passed since + the last attempt. + + If found self.proxy is set to a SOCKSProxy instance, otherwise + None. + ''' + if self.proxy or time.time() - self.last_proxy_try < 900: + return + self.last_proxy_try = time.time() + + host = self.env.tor_proxy_host + if self.env.tor_proxy_port is None: + ports = [9050, 9150, 1080] + else: + ports = [self.env.tor_proxy_port] + self.logger.info(f'trying to detect proxy on "{host}" ports {ports}') + + cls = SOCKSProxy + result = await cls.auto_detect_host(host, ports, None, loop=self.loop) + if isinstance(result, cls): + self.proxy = result + self.logger.info(f'detected {self.proxy}') + + async def _peer_discovery_loop(self): + '''Main loop performing peer maintenance. This includes + + 1) Forgetting unreachable peers. + 2) Verifying connectivity of new peers. + 3) Retrying old peers at regular intervals. + ''' + self._import_peers() + + try: + while True: + await self._maybe_detect_proxy() + await self._retry_peers() + timeout = self.loop.call_later(WAKEUP_SECS, + self.retry_event.set) + await self.retry_event.wait() + self.retry_event.clear() + timeout.cancel() + finally: + for session in list(PeerSession.sessions): + session.abort() + await session.wait_closed() + + async def _retry_peers(self): + '''Retry peers that are close to getting stale.''' + # Exponential backoff of retries + now = time.time() + nearly_stale_time = (now - STALE_SECS) + WAKEUP_SECS * 2 + + def should_retry(peer): + # Retry a peer whose ports might have updated + if peer.other_port_pairs: + return True + # Retry a good connection if it is about to turn stale + if peer.try_count == 0: + return peer.last_good < nearly_stale_time + # Retry a failed connection if enough time has passed + return peer.last_try < now - WAKEUP_SECS * 2 ** peer.try_count + + peers = [peer for peer in self.peers if should_retry(peer)] + + for peer in peers: + peer.try_count += 1 + pairs = peer.connection_port_pairs() + if peer.bad or not pairs: + self._maybe_forget_peer(peer) + else: + self._retry_peer(peer, pairs) + + def _retry_peer(self, peer, port_pairs): + peer.last_try = time.time() + + kwargs = {'loop': self.loop} + + kind, port = port_pairs[0] + if kind == 'SSL': + kwargs['ssl'] = ssl.SSLContext(ssl.PROTOCOL_TLS) + + host = self.env.cs_host(for_rpc=False) + if isinstance(host, list): + host = host[0] + + if self.env.force_proxy or peer.is_tor: + if not self.proxy: + return + kwargs['proxy'] = self.proxy + kwargs['resolve'] = not peer.is_tor + elif host: + # Use our listening Host/IP for outgoing non-proxy + # connections so our peers see the correct source. + kwargs['local_addr'] = (host, None) + + session = PeerSession(peer, self, kind, peer.host, port, **kwargs) + callback = partial(self._on_connected, peer, port_pairs) + self.tasks.create_task(session.create_connection(), callback) + + def _on_connected(self, peer, port_pairs, task): + '''Called when a connection attempt succeeds or fails. + + If failed, close the session, log it and try remaining port pairs. + ''' + if not task.cancelled() and task.exception(): + kind, port = port_pairs.pop(0) + elapsed = time.time() - peer.last_try + self.logger.info(f'failed connecting to {peer} at {kind} port ' + f'{port} in {elapsed:.1f}s: {task.exception()}') + if port_pairs: + self._retry_peer(peer, port_pairs) + else: + self._maybe_forget_peer(peer) + + def _set_verification_status(self, peer, kind, good): + '''Called when a verification succeeded or failed.''' + now = time.time() + if self.env.force_proxy or peer.is_tor: + how = 'via {} over Tor'.format(kind) + else: + how = 'via {} at {}'.format(kind, peer.ip_addr) + status = 'verified' if good else 'failed to verify' + elapsed = now - peer.last_try + self.logger.info(f'{status} {peer} {how} in {elapsed:.1f}s') + + if good: + peer.try_count = 0 + peer.last_good = now + peer.source = 'peer' + # At most 2 matches if we're a host name, potentially several if + # we're an IP address (several instances can share a NAT). + matches = peer.matches(self.peers) + for match in matches: + if match.ip_address: + if len(matches) > 1: + self.peers.remove(match) + elif peer.host in match.features['hosts']: + match.update_features_from_peer(peer) + else: + self._maybe_forget_peer(peer) + + def _maybe_forget_peer(self, peer): + '''Forget the peer if appropriate, e.g. long-term unreachable.''' + if peer.last_good and not peer.bad: + try_limit = 10 + else: + try_limit = 3 + forget = peer.try_count >= try_limit + + if forget: + desc = 'bad' if peer.bad else 'unreachable' + self.logger.info('forgetting {} peer: {}'.format(desc, peer)) + self.peers.discard(peer) + + return forget + + # + # External interface + # + def start_peer_discovery(self): + if self.env.peer_discovery == self.env.PD_ON: + self.logger.info(f'beginning peer discovery. Force use of ' + f'proxy: {self.env.force_proxy}') + self.tasks.create_task(self._peer_discovery_loop()) + else: + self.logger.info('peer discovery is disabled') + def add_peers(self, peers, limit=2, check_ports=False, source=None): '''Add a limited number of peers that are not already present.''' retry = False @@ -362,13 +522,17 @@ class PeerManager(object): if retry: self.retry_event.set() - def permit_new_onion_peer(self): - '''Accept a new onion peer only once per random time interval.''' - now = time.time() - if now < self.permit_onion_peer_time: - return False - self.permit_onion_peer_time = now + random.randrange(0, 1200) - return True + def info(self): + '''The number of peers.''' + self._set_peer_statuses() + counter = Counter(peer.status for peer in self.peers) + return { + 'bad': counter[PEER_BAD], + 'good': counter[PEER_GOOD], + 'never': counter[PEER_NEVER], + 'stale': counter[PEER_STALE], + 'total': len(self.peers), + } async def on_add_peer(self, features, source_info): '''Add a peer (but only if the peer resolves to the source).''' @@ -385,7 +549,7 @@ class PeerManager(object): peer = peers[0] host = peer.host if peer.is_tor: - permit = self.permit_new_onion_peer() + permit = self._permit_new_onion_peer() reason = 'rate limiting' else: try: @@ -444,188 +608,22 @@ class PeerManager(object): return [peer.to_tuple() for peer in peers] - def import_peers(self): - '''Import hard-coded peers from a file or the coin defaults.''' - self.add_peers(self.myselves) - - # Add the hard-coded ones unless only returning self - if self.env.peer_discovery != self.env.PD_SELF: - coin_peers = self.env.coin.PEERS - peers = [Peer.from_real_name(real_name, 'coins.py') - for real_name in coin_peers] - self.add_peers(peers, limit=None) - - async def maybe_detect_proxy(self): - '''Detect a proxy if we don't have one and some time has passed since - the last attempt. - - If found self.proxy is set to a SOCKSProxy instance, otherwise - None. - ''' - if self.proxy or time.time() - self.last_proxy_try < 900: - return - self.last_proxy_try = time.time() - - host = self.env.tor_proxy_host - if self.env.tor_proxy_port is None: - ports = [9050, 9150, 1080] - else: - ports = [self.env.tor_proxy_port] - self.logger.info(f'trying to detect proxy on "{host}" ports {ports}') - - cls = SOCKSProxy - result = await cls.auto_detect_host(host, ports, None, loop=self.loop) - if isinstance(result, cls): - self.proxy = result - self.logger.info(f'detected {self.proxy}') - def proxy_peername(self): '''Return the peername of the proxy, if there is a proxy, otherwise None.''' return self.proxy.peername if self.proxy else None - def start_peer_discovery(self): - if self.env.peer_discovery == self.env.PD_ON: - self.logger.info(f'beginning peer discovery. Force use of ' - f'proxy: {self.env.force_proxy}') - self.tasks.create_task(self.peer_discovery_loop()) - else: - self.logger.info('peer discovery is disabled') + def rpc_data(self): + '''Peer data for the peers RPC method.''' + self._set_peer_statuses() + descs = ['good', 'stale', 'never', 'bad'] - async def peer_discovery_loop(self): - '''Main loop performing peer maintenance. This includes + def peer_data(peer): + data = peer.serialize() + data['status'] = descs[peer.status] + return data - 1) Forgetting unreachable peers. - 2) Verifying connectivity of new peers. - 3) Retrying old peers at regular intervals. - ''' - self.import_peers() + def peer_key(peer): + return (peer.bad, -peer.last_good) - try: - while True: - await self.maybe_detect_proxy() - await self.retry_peers() - timeout = self.loop.call_later(WAKEUP_SECS, - self.retry_event.set) - await self.retry_event.wait() - self.retry_event.clear() - timeout.cancel() - finally: - for session in list(PeerSession.sessions): - session.abort() - await session.wait_closed() - - def is_coin_onion_peer(self, peer): - '''Return true if this peer is a hard-coded onion peer.''' - return peer.is_tor and any(peer.host in real_name - for real_name in self.env.coin.PEERS) - - async def retry_peers(self): - '''Retry peers that are close to getting stale.''' - # Exponential backoff of retries - now = time.time() - nearly_stale_time = (now - STALE_SECS) + WAKEUP_SECS * 2 - - def should_retry(peer): - # Retry a peer whose ports might have updated - if peer.other_port_pairs: - return True - # Retry a good connection if it is about to turn stale - if peer.try_count == 0: - return peer.last_good < nearly_stale_time - # Retry a failed connection if enough time has passed - return peer.last_try < now - WAKEUP_SECS * 2 ** peer.try_count - - peers = [peer for peer in self.peers if should_retry(peer)] - - for peer in peers: - peer.try_count += 1 - pairs = peer.connection_port_pairs() - if peer.bad or not pairs: - self.maybe_forget_peer(peer) - else: - self.retry_peer(peer, pairs) - - def retry_peer(self, peer, port_pairs): - peer.last_try = time.time() - - kwargs = {'loop': self.loop} - - kind, port = port_pairs[0] - if kind == 'SSL': - kwargs['ssl'] = ssl.SSLContext(ssl.PROTOCOL_TLS) - - host = self.env.cs_host(for_rpc=False) - if isinstance(host, list): - host = host[0] - - if self.env.force_proxy or peer.is_tor: - if not self.proxy: - return - kwargs['proxy'] = self.proxy - kwargs['resolve'] = not peer.is_tor - elif host: - # Use our listening Host/IP for outgoing non-proxy - # connections so our peers see the correct source. - kwargs['local_addr'] = (host, None) - - session = PeerSession(peer, self, kind, peer.host, port, **kwargs) - callback = partial(self.on_connected, peer, port_pairs) - self.tasks.create_task(session.create_connection(), callback) - - def on_connected(self, peer, port_pairs, task): - '''Called when a connection attempt succeeds or fails. - - If failed, close the session, log it and try remaining port pairs. - ''' - if not task.cancelled() and task.exception(): - kind, port = port_pairs.pop(0) - elapsed = time.time() - peer.last_try - self.logger.info(f'failed connecting to {peer} at {kind} port ' - f'{port} in {elapsed:.1f}s: {task.exception()}') - if port_pairs: - self.retry_peer(peer, port_pairs) - else: - self.maybe_forget_peer(peer) - - def set_verification_status(self, peer, kind, good): - '''Called when a verification succeeded or failed.''' - now = time.time() - if self.env.force_proxy or peer.is_tor: - how = 'via {} over Tor'.format(kind) - else: - how = 'via {} at {}'.format(kind, peer.ip_addr) - status = 'verified' if good else 'failed to verify' - elapsed = now - peer.last_try - self.logger.info(f'{status} {peer} {how} in {elapsed:.1f}s') - - if good: - peer.try_count = 0 - peer.last_good = now - peer.source = 'peer' - # At most 2 matches if we're a host name, potentially several if - # we're an IP address (several instances can share a NAT). - matches = peer.matches(self.peers) - for match in matches: - if match.ip_address: - if len(matches) > 1: - self.peers.remove(match) - elif peer.host in match.features['hosts']: - match.update_features_from_peer(peer) - else: - self.maybe_forget_peer(peer) - - def maybe_forget_peer(self, peer): - '''Forget the peer if appropriate, e.g. long-term unreachable.''' - if peer.last_good and not peer.bad: - try_limit = 10 - else: - try_limit = 3 - forget = peer.try_count >= try_limit - - if forget: - desc = 'bad' if peer.bad else 'unreachable' - self.logger.info('forgetting {} peer: {}'.format(desc, peer)) - self.peers.discard(peer) - - return forget + return [peer_data(peer) for peer in sorted(self.peers, key=peer_key)]