diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index f7b0ee7..0f672ac 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -21,7 +21,6 @@ from aiorpcx import (ClientSession, SOCKSProxy, SOCKSError, from electrumx.lib.peer import Peer from electrumx.lib.util import class_logger, protocol_tuple - PEER_GOOD, PEER_STALE, PEER_NEVER, PEER_BAD = range(4) STALE_SECS = 24 * 3600 WAKEUP_SECS = 300 @@ -68,7 +67,6 @@ class PeerManager(object): self.myselves = [Peer(ident.host, sclass.server_features(env), 'env') for ident in env.identities] self.server_version_args = sclass.server_version_args() - self.retry_event = asyncio.Event() # Peers have one entry per hostname. Once connected, the # ip_addr property is either None, an onion peer, or the # IP address that was connected to. Adding a peer will evict @@ -76,7 +74,7 @@ class PeerManager(object): self.peers = set() self.permit_onion_peer_time = time.time() self.proxy = None - self.last_proxy_try = 0 + self.task_group = None def _my_clearnet_peer(self): '''Returns the clearnet peer representing this server, if any.''' @@ -101,8 +99,6 @@ class PeerManager(object): reported the given list of known peers, return the clearnet identity features to register, otherwise None. ''' - self.add_peers(remote_peers) - # Announce ourself if not present. Don't if disabled, we # are a non-public IP address, or to ourselves. if not self.env.peer_announce or peer in self.myselves: @@ -124,68 +120,125 @@ class PeerManager(object): self.permit_onion_peer_time = now + random.randrange(0, 1200) return True - def _import_peers(self): + async def _import_peers(self): '''Import hard-coded peers from a file or the coin defaults.''' - self.add_peers(self.myselves) - - # Add the hard-coded ones unless only returning self + imported_peers = self.myselves.copy() + # Add the hard-coded ones unless only reporting ourself if self.env.peer_discovery != self.env.PD_SELF: - coin_peers = self.env.coin.PEERS - peers = [Peer.from_real_name(real_name, 'coins.py') - for real_name in coin_peers] - self.add_peers(peers, limit=None) + imported_peers.extend(Peer.from_real_name(real_name, 'coins.py') + for real_name in self.env.coin.PEERS) + await self._note_peers(imported_peers, limit=None) - async def _maybe_detect_proxy(self): + async def _detect_proxy(self): '''Detect a proxy if we don't have one and some time has passed since the last attempt. If found self.proxy is set to a SOCKSProxy instance, otherwise None. ''' - if self.proxy or time.time() - self.last_proxy_try < 900: - return - self.last_proxy_try = time.time() - host = self.env.tor_proxy_host if self.env.tor_proxy_port is None: ports = [9050, 9150, 1080] else: ports = [self.env.tor_proxy_port] - self.logger.info(f'trying to detect proxy on "{host}" ports {ports}') + while True: + self.logger.info(f'trying to detect proxy on "{host}" ' + f'ports {ports}') + proxy = await SOCKSProxy.auto_detect_host(host, ports, None) + if proxy: + self.proxy = proxy + self.logger.info(f'detected {proxy}') + return + self.logger.info('no proxy detected, will try later') + await asyncio.sleep(900) - cls = SOCKSProxy - result = await cls.auto_detect_host(host, ports, None) - if isinstance(result, cls): - self.proxy = result - self.logger.info(f'detected {self.proxy}') - else: - self.logger.info('no proxy detected') + async def _note_peers(self, peers, limit=2, check_ports=False, + source=None): + '''Add a limited number of peers that are not already present.''' + new_peers = [] + for peer in peers: + if not peer.is_public or (peer.is_tor and not self.proxy): + continue - async def _retry_peers(self): - '''Retry peers that are close to getting stale.''' - # Exponential backoff of retries - now = time.time() - nearly_stale_time = (now - STALE_SECS) + WAKEUP_SECS * 2 + matches = peer.matches(self.peers) + if not matches: + new_peers.append(peer) + elif check_ports: + for match in matches: + if match.check_ports(peer): + self.logger.info(f'ports changed for {peer}') + peer.retry_event.set() - def should_retry(peer): - # Retry a peer whose ports might have updated - if peer.other_port_pairs: - return True - # Retry a good connection if it is about to turn stale + if new_peers: + source = source or new_peers[0].source + if limit: + random.shuffle(new_peers) + use_peers = new_peers[:limit] + else: + use_peers = new_peers + for peer in use_peers: + self.logger.info(f'accepted new peer {peer} from {source}') + peer.retry_event = asyncio.Event() + self.peers.add(peer) + await self.task_group.spawn(self._monitor_peer(peer)) + + async def _monitor_peer(self, peer): + # Stop monitoring if we were dropped (a duplicate peer) + while peer in self.peers: + is_good = await self._is_peer_good(peer) + if self._should_drop_peer(peer, is_good): + break + # Figure out how long to sleep before retrying. Retry a + # good connection when it is about to turn stale, otherwise + # exponentially back off retries. if peer.try_count == 0: - return peer.last_good < nearly_stale_time - # Retry a failed connection if enough time has passed - return peer.last_try < now - WAKEUP_SECS * 2 ** peer.try_count + pause = STALE_SECS - WAKEUP_SECS * 2 + else: + pause = WAKEUP_SECS * 2 ** peer.try_count + async with ignore_after(pause): + await peer.retry_event.wait() - async with TaskGroup() as group: - for peer in self.peers: - if should_retry(peer): - await group.spawn(self._retry_peer(peer)) + async def _should_drop_peer(self, peer, is_good): + now = time.time() + if self.env.force_proxy or peer.is_tor: + how = f'via {kind} over Tor' + else: + how = f'via {kind} at {peer.ip_addr}' + status = 'verified' if good else 'failed to verify' + elapsed = now - peer.last_try + self.logger.info(f'{status} {peer} {how} in {elapsed:.1f}s') - async def _retry_peer(self, peer): + if good: + peer.try_count = 0 + peer.last_good = now + peer.source = 'peer' + # At most 2 matches if we're a host name, potentially + # several if we're an IP address (several instances + # can share a NAT). + matches = peer.matches(self.peers) + for match in matches: + if match.ip_address: + if len(matches) > 1: + self.peers.remove(match) + # Force the peer's monitoring task to exit + match.retry_event.set() + elif peer.host in match.features['hosts']: + match.update_features_from_peer(peer) + else: + # Forget the peer if long-term unreachable + if peer.last_good and not peer.bad: + try_limit = 10 + else: + try_limit = 3 + if peer.try_count >= try_limit: + desc = 'bad' if peer.bad else 'unreachable' + self.logger.info(f'forgetting {desc} peer: {peer}') + self.peers.discard(peer) + return True + return False + + async def _is_peer_good(self, peer): peer.try_count += 1 - success = False - for kind, port in peer.connection_port_pairs(): peer.last_try = time.time() @@ -210,8 +263,7 @@ class PeerManager(object): try: async with PeerSession(peer.host, port, **kwargs) as session: await self._verify_peer(session, peer) - success = True - break + return True except BadPeerError as e: self.logger.error(f'[{peer}] marking bad: ({e})') peer.mark_bad() @@ -225,18 +277,6 @@ class PeerManager(object): self.logger.info(f'[{peer}] {kind} connection to ' f'port {port} failed: {e}') - self._set_verification_status(peer, kind, success) - # Forget the peer if appropriate, e.g. long-term unreachable - if not success: - if peer.last_good and not peer.bad: - try_limit = 10 - else: - try_limit = 3 - if peer.try_count >= try_limit: - desc = 'bad' if peer.bad else 'unreachable' - self.logger.info(f'forgetting {desc} peer: {peer}') - self.peers.discard(peer) - async def _verify_peer(self, session, peer): if not peer.is_tor: address = session.peer_address() @@ -335,6 +375,7 @@ class PeerManager(object): except Exception: raise BadPeerError('bad server.peers.subscribe response') + await self._note_peers(peers) features = self._features_to_register(peer, peers) if not features: return @@ -343,35 +384,10 @@ class PeerManager(object): await session.send_request('server.add_peer', [features], timeout=timeout) - def _set_verification_status(self, peer, kind, good): - '''Called when a verification succeeded or failed.''' - now = time.time() - if self.env.force_proxy or peer.is_tor: - how = 'via {} over Tor'.format(kind) - else: - how = 'via {} at {}'.format(kind, peer.ip_addr) - status = 'verified' if good else 'failed to verify' - elapsed = now - peer.last_try - self.logger.info(f'{status} {peer} {how} in {elapsed:.1f}s') - - if good: - peer.try_count = 0 - peer.last_good = now - peer.source = 'peer' - # At most 2 matches if we're a host name, potentially several if - # we're an IP address (several instances can share a NAT). - matches = peer.matches(self.peers) - for match in matches: - if match.ip_address: - if len(matches) > 1: - self.peers.remove(match) - elif peer.host in match.features['hosts']: - match.update_features_from_peer(peer) - # # External interface # - async def discover_peers(self): + async def discover_peers(self, task_group): '''Perform peer maintenance. This includes 1) Forgetting unreachable peers. @@ -384,46 +400,9 @@ class PeerManager(object): self.logger.info(f'beginning peer discovery. Force use of ' f'proxy: {self.env.force_proxy}') - self._import_peers() - - while True: - await self._maybe_detect_proxy() - await self._retry_peers() - async with ignore_after(WAKEUP_SECS): - await self.retry_event.wait() - self.retry_event.clear() - - def add_peers(self, peers, limit=2, check_ports=False, source=None): - '''Add a limited number of peers that are not already present.''' - retry = False - new_peers = [] - for peer in peers: - if not peer.is_public or (peer.is_tor and not self.proxy): - continue - - matches = peer.matches(self.peers) - if not matches: - new_peers.append(peer) - elif check_ports: - for match in matches: - if match.check_ports(peer): - self.logger.info(f'ports changed for {peer}') - retry = True - - if new_peers: - retry = True - source = source or new_peers[0].source - if limit: - random.shuffle(new_peers) - use_peers = new_peers[:limit] - else: - use_peers = new_peers - for peer in use_peers: - self.logger.info(f'accepted new peer {peer} from {source}') - self.peers.update(use_peers) - - if retry: - self.retry_event.set() + self.task_group = task_group + await task_group.spawn(self._detect_proxy()) + await task_group.spawn(self._import_peers()) def info(self): '''The number of peers.''' @@ -437,6 +416,10 @@ class PeerManager(object): 'total': len(self.peers), } + async def add_localRPC_peer(self, real_name): + '''Add a peer passed by the admin over LocalRPC.''' + await self._note_peers([Peer.from_real_name(real_name, 'RPC')]) + async def on_add_peer(self, features, source_info): '''Add a peer (but only if the peer resolves to the source).''' if not source_info: @@ -468,7 +451,7 @@ class PeerManager(object): if permit: self.logger.info(f'accepted add_peer request from {source} ' f'for {host}') - self.add_peers([peer], check_ports=True) + await self._note_peers([peer], check_ports=True) else: self.logger.warning(f'rejected add_peer request from {source} ' f'for {host} ({reason})') diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 8ce68fd..4da5061 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -326,13 +326,12 @@ class SessionManager(object): # --- LocalRPC command handlers - def rpc_add_peer(self, real_name): + async def rpc_add_peer(self, real_name): '''Add a peer. - real_name: a real name, as would appear on IRC + real_name: "bch.electrumx.cash t50001 s50002" for example ''' - peer = Peer.from_real_name(real_name, 'RPC') - self.peer_mgr.add_peers([peer]) + await self.peer_mgr.add_localRPC_peer(real_name) return "peer '{}' added".format(real_name) def rpc_disconnect(self, session_ids): @@ -422,7 +421,7 @@ class SessionManager(object): # Peer discovery should start after the external servers # because we connect to ourself async with TaskGroup() as group: - await group.spawn(self.peer_mgr.discover_peers()) + await group.spawn(self.peer_mgr.discover_peers(group)) await group.spawn(self._housekeeping()) finally: # Close servers and sessions