Have one peer monitoring task per peer

This commit is contained in:
Neil Booth 2018-07-28 16:10:36 +08:00
parent 751f9917a4
commit d962c97ef1
2 changed files with 114 additions and 132 deletions

View File

@ -21,7 +21,6 @@ from aiorpcx import (ClientSession, SOCKSProxy, SOCKSError,
from electrumx.lib.peer import Peer
from electrumx.lib.util import class_logger, protocol_tuple
PEER_GOOD, PEER_STALE, PEER_NEVER, PEER_BAD = range(4)
STALE_SECS = 24 * 3600
WAKEUP_SECS = 300
@ -68,7 +67,6 @@ class PeerManager(object):
self.myselves = [Peer(ident.host, sclass.server_features(env), 'env')
for ident in env.identities]
self.server_version_args = sclass.server_version_args()
self.retry_event = asyncio.Event()
# Peers have one entry per hostname. Once connected, the
# ip_addr property is either None, an onion peer, or the
# IP address that was connected to. Adding a peer will evict
@ -76,7 +74,7 @@ class PeerManager(object):
self.peers = set()
self.permit_onion_peer_time = time.time()
self.proxy = None
self.last_proxy_try = 0
self.task_group = None
def _my_clearnet_peer(self):
'''Returns the clearnet peer representing this server, if any.'''
@ -101,8 +99,6 @@ class PeerManager(object):
reported the given list of known peers, return the clearnet
identity features to register, otherwise None.
'''
self.add_peers(remote_peers)
# Announce ourself if not present. Don't if disabled, we
# are a non-public IP address, or to ourselves.
if not self.env.peer_announce or peer in self.myselves:
@ -124,68 +120,125 @@ class PeerManager(object):
self.permit_onion_peer_time = now + random.randrange(0, 1200)
return True
def _import_peers(self):
async def _import_peers(self):
'''Import hard-coded peers from a file or the coin defaults.'''
self.add_peers(self.myselves)
# Add the hard-coded ones unless only returning self
imported_peers = self.myselves.copy()
# Add the hard-coded ones unless only reporting ourself
if self.env.peer_discovery != self.env.PD_SELF:
coin_peers = self.env.coin.PEERS
peers = [Peer.from_real_name(real_name, 'coins.py')
for real_name in coin_peers]
self.add_peers(peers, limit=None)
imported_peers.extend(Peer.from_real_name(real_name, 'coins.py')
for real_name in self.env.coin.PEERS)
await self._note_peers(imported_peers, limit=None)
async def _maybe_detect_proxy(self):
async def _detect_proxy(self):
'''Detect a proxy if we don't have one and some time has passed since
the last attempt.
If found self.proxy is set to a SOCKSProxy instance, otherwise
None.
'''
if self.proxy or time.time() - self.last_proxy_try < 900:
return
self.last_proxy_try = time.time()
host = self.env.tor_proxy_host
if self.env.tor_proxy_port is None:
ports = [9050, 9150, 1080]
else:
ports = [self.env.tor_proxy_port]
self.logger.info(f'trying to detect proxy on "{host}" ports {ports}')
while True:
self.logger.info(f'trying to detect proxy on "{host}" '
f'ports {ports}')
proxy = await SOCKSProxy.auto_detect_host(host, ports, None)
if proxy:
self.proxy = proxy
self.logger.info(f'detected {proxy}')
return
self.logger.info('no proxy detected, will try later')
await asyncio.sleep(900)
cls = SOCKSProxy
result = await cls.auto_detect_host(host, ports, None)
if isinstance(result, cls):
self.proxy = result
self.logger.info(f'detected {self.proxy}')
else:
self.logger.info('no proxy detected')
async def _note_peers(self, peers, limit=2, check_ports=False,
source=None):
'''Add a limited number of peers that are not already present.'''
new_peers = []
for peer in peers:
if not peer.is_public or (peer.is_tor and not self.proxy):
continue
async def _retry_peers(self):
'''Retry peers that are close to getting stale.'''
# Exponential backoff of retries
now = time.time()
nearly_stale_time = (now - STALE_SECS) + WAKEUP_SECS * 2
matches = peer.matches(self.peers)
if not matches:
new_peers.append(peer)
elif check_ports:
for match in matches:
if match.check_ports(peer):
self.logger.info(f'ports changed for {peer}')
peer.retry_event.set()
def should_retry(peer):
# Retry a peer whose ports might have updated
if peer.other_port_pairs:
return True
# Retry a good connection if it is about to turn stale
if new_peers:
source = source or new_peers[0].source
if limit:
random.shuffle(new_peers)
use_peers = new_peers[:limit]
else:
use_peers = new_peers
for peer in use_peers:
self.logger.info(f'accepted new peer {peer} from {source}')
peer.retry_event = asyncio.Event()
self.peers.add(peer)
await self.task_group.spawn(self._monitor_peer(peer))
async def _monitor_peer(self, peer):
# Stop monitoring if we were dropped (a duplicate peer)
while peer in self.peers:
is_good = await self._is_peer_good(peer)
if self._should_drop_peer(peer, is_good):
break
# Figure out how long to sleep before retrying. Retry a
# good connection when it is about to turn stale, otherwise
# exponentially back off retries.
if peer.try_count == 0:
return peer.last_good < nearly_stale_time
# Retry a failed connection if enough time has passed
return peer.last_try < now - WAKEUP_SECS * 2 ** peer.try_count
pause = STALE_SECS - WAKEUP_SECS * 2
else:
pause = WAKEUP_SECS * 2 ** peer.try_count
async with ignore_after(pause):
await peer.retry_event.wait()
async with TaskGroup() as group:
for peer in self.peers:
if should_retry(peer):
await group.spawn(self._retry_peer(peer))
async def _should_drop_peer(self, peer, is_good):
now = time.time()
if self.env.force_proxy or peer.is_tor:
how = f'via {kind} over Tor'
else:
how = f'via {kind} at {peer.ip_addr}'
status = 'verified' if good else 'failed to verify'
elapsed = now - peer.last_try
self.logger.info(f'{status} {peer} {how} in {elapsed:.1f}s')
async def _retry_peer(self, peer):
if good:
peer.try_count = 0
peer.last_good = now
peer.source = 'peer'
# At most 2 matches if we're a host name, potentially
# several if we're an IP address (several instances
# can share a NAT).
matches = peer.matches(self.peers)
for match in matches:
if match.ip_address:
if len(matches) > 1:
self.peers.remove(match)
# Force the peer's monitoring task to exit
match.retry_event.set()
elif peer.host in match.features['hosts']:
match.update_features_from_peer(peer)
else:
# Forget the peer if long-term unreachable
if peer.last_good and not peer.bad:
try_limit = 10
else:
try_limit = 3
if peer.try_count >= try_limit:
desc = 'bad' if peer.bad else 'unreachable'
self.logger.info(f'forgetting {desc} peer: {peer}')
self.peers.discard(peer)
return True
return False
async def _is_peer_good(self, peer):
peer.try_count += 1
success = False
for kind, port in peer.connection_port_pairs():
peer.last_try = time.time()
@ -210,8 +263,7 @@ class PeerManager(object):
try:
async with PeerSession(peer.host, port, **kwargs) as session:
await self._verify_peer(session, peer)
success = True
break
return True
except BadPeerError as e:
self.logger.error(f'[{peer}] marking bad: ({e})')
peer.mark_bad()
@ -225,18 +277,6 @@ class PeerManager(object):
self.logger.info(f'[{peer}] {kind} connection to '
f'port {port} failed: {e}')
self._set_verification_status(peer, kind, success)
# Forget the peer if appropriate, e.g. long-term unreachable
if not success:
if peer.last_good and not peer.bad:
try_limit = 10
else:
try_limit = 3
if peer.try_count >= try_limit:
desc = 'bad' if peer.bad else 'unreachable'
self.logger.info(f'forgetting {desc} peer: {peer}')
self.peers.discard(peer)
async def _verify_peer(self, session, peer):
if not peer.is_tor:
address = session.peer_address()
@ -335,6 +375,7 @@ class PeerManager(object):
except Exception:
raise BadPeerError('bad server.peers.subscribe response')
await self._note_peers(peers)
features = self._features_to_register(peer, peers)
if not features:
return
@ -343,35 +384,10 @@ class PeerManager(object):
await session.send_request('server.add_peer', [features],
timeout=timeout)
def _set_verification_status(self, peer, kind, good):
'''Called when a verification succeeded or failed.'''
now = time.time()
if self.env.force_proxy or peer.is_tor:
how = 'via {} over Tor'.format(kind)
else:
how = 'via {} at {}'.format(kind, peer.ip_addr)
status = 'verified' if good else 'failed to verify'
elapsed = now - peer.last_try
self.logger.info(f'{status} {peer} {how} in {elapsed:.1f}s')
if good:
peer.try_count = 0
peer.last_good = now
peer.source = 'peer'
# At most 2 matches if we're a host name, potentially several if
# we're an IP address (several instances can share a NAT).
matches = peer.matches(self.peers)
for match in matches:
if match.ip_address:
if len(matches) > 1:
self.peers.remove(match)
elif peer.host in match.features['hosts']:
match.update_features_from_peer(peer)
#
# External interface
#
async def discover_peers(self):
async def discover_peers(self, task_group):
'''Perform peer maintenance. This includes
1) Forgetting unreachable peers.
@ -384,46 +400,9 @@ class PeerManager(object):
self.logger.info(f'beginning peer discovery. Force use of '
f'proxy: {self.env.force_proxy}')
self._import_peers()
while True:
await self._maybe_detect_proxy()
await self._retry_peers()
async with ignore_after(WAKEUP_SECS):
await self.retry_event.wait()
self.retry_event.clear()
def add_peers(self, peers, limit=2, check_ports=False, source=None):
'''Add a limited number of peers that are not already present.'''
retry = False
new_peers = []
for peer in peers:
if not peer.is_public or (peer.is_tor and not self.proxy):
continue
matches = peer.matches(self.peers)
if not matches:
new_peers.append(peer)
elif check_ports:
for match in matches:
if match.check_ports(peer):
self.logger.info(f'ports changed for {peer}')
retry = True
if new_peers:
retry = True
source = source or new_peers[0].source
if limit:
random.shuffle(new_peers)
use_peers = new_peers[:limit]
else:
use_peers = new_peers
for peer in use_peers:
self.logger.info(f'accepted new peer {peer} from {source}')
self.peers.update(use_peers)
if retry:
self.retry_event.set()
self.task_group = task_group
await task_group.spawn(self._detect_proxy())
await task_group.spawn(self._import_peers())
def info(self):
'''The number of peers.'''
@ -437,6 +416,10 @@ class PeerManager(object):
'total': len(self.peers),
}
async def add_localRPC_peer(self, real_name):
'''Add a peer passed by the admin over LocalRPC.'''
await self._note_peers([Peer.from_real_name(real_name, 'RPC')])
async def on_add_peer(self, features, source_info):
'''Add a peer (but only if the peer resolves to the source).'''
if not source_info:
@ -468,7 +451,7 @@ class PeerManager(object):
if permit:
self.logger.info(f'accepted add_peer request from {source} '
f'for {host}')
self.add_peers([peer], check_ports=True)
await self._note_peers([peer], check_ports=True)
else:
self.logger.warning(f'rejected add_peer request from {source} '
f'for {host} ({reason})')

View File

@ -326,13 +326,12 @@ class SessionManager(object):
# --- LocalRPC command handlers
def rpc_add_peer(self, real_name):
async def rpc_add_peer(self, real_name):
'''Add a peer.
real_name: a real name, as would appear on IRC
real_name: "bch.electrumx.cash t50001 s50002" for example
'''
peer = Peer.from_real_name(real_name, 'RPC')
self.peer_mgr.add_peers([peer])
await self.peer_mgr.add_localRPC_peer(real_name)
return "peer '{}' added".format(real_name)
def rpc_disconnect(self, session_ids):
@ -422,7 +421,7 @@ class SessionManager(object):
# Peer discovery should start after the external servers
# because we connect to ourself
async with TaskGroup() as group:
await group.spawn(self.peer_mgr.discover_peers())
await group.spawn(self.peer_mgr.discover_peers(group))
await group.spawn(self._housekeeping())
finally:
# Close servers and sessions