Distinguish private and public methods of peer mgr
Give private methods a leading _ Remove dead code
This commit is contained in:
parent
92ddb52f63
commit
79b98b2b54
@ -89,13 +89,13 @@ class PeerSession(ClientSession):
|
||||
|
||||
def fail(self, request, reason):
|
||||
self.logger.error(f'{request} failed: {reason}')
|
||||
self.peer_mgr.set_verification_status(self.peer, self.kind, False)
|
||||
self.peer_mgr._set_verification_status(self.peer, self.kind, False)
|
||||
self.close()
|
||||
|
||||
def bad(self, reason):
|
||||
self.logger.error(f'marking bad: {reason}')
|
||||
self.peer.mark_bad()
|
||||
self.peer_mgr.set_verification_status(self.peer, self.kind, False)
|
||||
self.peer_mgr._set_verification_status(self.peer, self.kind, False)
|
||||
self.close()
|
||||
|
||||
def on_version(self, request):
|
||||
@ -212,7 +212,7 @@ class PeerSession(ClientSession):
|
||||
self.bad('bad server.peers.subscribe response')
|
||||
return
|
||||
|
||||
features = self.peer_mgr.features_to_register(self.peer, peers)
|
||||
features = self.peer_mgr._features_to_register(self.peer, peers)
|
||||
if features:
|
||||
self.logger.info(f'registering ourself with "server.add_peer"')
|
||||
self.send_request('server.add_peer', [features],
|
||||
@ -231,7 +231,7 @@ class PeerSession(ClientSession):
|
||||
'''
|
||||
if not self.all_requests():
|
||||
self.close()
|
||||
self.peer_mgr.set_verification_status(self.peer, self.kind, True)
|
||||
self.peer_mgr._set_verification_status(self.peer, self.kind, True)
|
||||
|
||||
|
||||
class PeerManager(object):
|
||||
@ -264,24 +264,12 @@ class PeerManager(object):
|
||||
self.proxy = None
|
||||
self.last_proxy_try = 0
|
||||
|
||||
def my_clearnet_peer(self):
|
||||
def _my_clearnet_peer(self):
|
||||
'''Returns the clearnet peer representing this server, if any.'''
|
||||
clearnet = [peer for peer in self.myselves if not peer.is_tor]
|
||||
return clearnet[0] if clearnet else None
|
||||
|
||||
def info(self):
|
||||
'''The number of peers.'''
|
||||
self.set_peer_statuses()
|
||||
counter = Counter(peer.status for peer in self.peers)
|
||||
return {
|
||||
'bad': counter[PEER_BAD],
|
||||
'good': counter[PEER_GOOD],
|
||||
'never': counter[PEER_NEVER],
|
||||
'stale': counter[PEER_STALE],
|
||||
'total': len(self.peers),
|
||||
}
|
||||
|
||||
def set_peer_statuses(self):
|
||||
def _set_peer_statuses(self):
|
||||
'''Set peer statuses.'''
|
||||
cutoff = time.time() - STALE_SECS
|
||||
for peer in self.peers:
|
||||
@ -294,22 +282,7 @@ class PeerManager(object):
|
||||
else:
|
||||
peer.status = PEER_NEVER
|
||||
|
||||
def rpc_data(self):
|
||||
'''Peer data for the peers RPC method.'''
|
||||
self.set_peer_statuses()
|
||||
descs = ['good', 'stale', 'never', 'bad']
|
||||
|
||||
def peer_data(peer):
|
||||
data = peer.serialize()
|
||||
data['status'] = descs[peer.status]
|
||||
return data
|
||||
|
||||
def peer_key(peer):
|
||||
return (peer.bad, -peer.last_good)
|
||||
|
||||
return [peer_data(peer) for peer in sorted(self.peers, key=peer_key)]
|
||||
|
||||
def features_to_register(self, peer, remote_peers):
|
||||
def _features_to_register(self, peer, remote_peers):
|
||||
'''If we should register ourselves to the remote peer, which has
|
||||
reported the given list of known peers, return the clearnet
|
||||
identity features to register, otherwise None.
|
||||
@ -320,7 +293,7 @@ class PeerManager(object):
|
||||
# are a non-public IP address, or to ourselves.
|
||||
if not self.env.peer_announce or peer in self.myselves:
|
||||
return None
|
||||
my = self.my_clearnet_peer()
|
||||
my = self._my_clearnet_peer()
|
||||
if not my or not my.is_public:
|
||||
return None
|
||||
# Register if no matches, or ports have changed
|
||||
@ -329,6 +302,193 @@ class PeerManager(object):
|
||||
return None
|
||||
return my.features
|
||||
|
||||
def _permit_new_onion_peer(self):
|
||||
'''Accept a new onion peer only once per random time interval.'''
|
||||
now = time.time()
|
||||
if now < self.permit_onion_peer_time:
|
||||
return False
|
||||
self.permit_onion_peer_time = now + random.randrange(0, 1200)
|
||||
return True
|
||||
|
||||
def _import_peers(self):
|
||||
'''Import hard-coded peers from a file or the coin defaults.'''
|
||||
self.add_peers(self.myselves)
|
||||
|
||||
# Add the hard-coded ones unless only returning self
|
||||
if self.env.peer_discovery != self.env.PD_SELF:
|
||||
coin_peers = self.env.coin.PEERS
|
||||
peers = [Peer.from_real_name(real_name, 'coins.py')
|
||||
for real_name in coin_peers]
|
||||
self.add_peers(peers, limit=None)
|
||||
|
||||
async def _maybe_detect_proxy(self):
|
||||
'''Detect a proxy if we don't have one and some time has passed since
|
||||
the last attempt.
|
||||
|
||||
If found self.proxy is set to a SOCKSProxy instance, otherwise
|
||||
None.
|
||||
'''
|
||||
if self.proxy or time.time() - self.last_proxy_try < 900:
|
||||
return
|
||||
self.last_proxy_try = time.time()
|
||||
|
||||
host = self.env.tor_proxy_host
|
||||
if self.env.tor_proxy_port is None:
|
||||
ports = [9050, 9150, 1080]
|
||||
else:
|
||||
ports = [self.env.tor_proxy_port]
|
||||
self.logger.info(f'trying to detect proxy on "{host}" ports {ports}')
|
||||
|
||||
cls = SOCKSProxy
|
||||
result = await cls.auto_detect_host(host, ports, None, loop=self.loop)
|
||||
if isinstance(result, cls):
|
||||
self.proxy = result
|
||||
self.logger.info(f'detected {self.proxy}')
|
||||
|
||||
async def _peer_discovery_loop(self):
|
||||
'''Main loop performing peer maintenance. This includes
|
||||
|
||||
1) Forgetting unreachable peers.
|
||||
2) Verifying connectivity of new peers.
|
||||
3) Retrying old peers at regular intervals.
|
||||
'''
|
||||
self._import_peers()
|
||||
|
||||
try:
|
||||
while True:
|
||||
await self._maybe_detect_proxy()
|
||||
await self._retry_peers()
|
||||
timeout = self.loop.call_later(WAKEUP_SECS,
|
||||
self.retry_event.set)
|
||||
await self.retry_event.wait()
|
||||
self.retry_event.clear()
|
||||
timeout.cancel()
|
||||
finally:
|
||||
for session in list(PeerSession.sessions):
|
||||
session.abort()
|
||||
await session.wait_closed()
|
||||
|
||||
async def _retry_peers(self):
|
||||
'''Retry peers that are close to getting stale.'''
|
||||
# Exponential backoff of retries
|
||||
now = time.time()
|
||||
nearly_stale_time = (now - STALE_SECS) + WAKEUP_SECS * 2
|
||||
|
||||
def should_retry(peer):
|
||||
# Retry a peer whose ports might have updated
|
||||
if peer.other_port_pairs:
|
||||
return True
|
||||
# Retry a good connection if it is about to turn stale
|
||||
if peer.try_count == 0:
|
||||
return peer.last_good < nearly_stale_time
|
||||
# Retry a failed connection if enough time has passed
|
||||
return peer.last_try < now - WAKEUP_SECS * 2 ** peer.try_count
|
||||
|
||||
peers = [peer for peer in self.peers if should_retry(peer)]
|
||||
|
||||
for peer in peers:
|
||||
peer.try_count += 1
|
||||
pairs = peer.connection_port_pairs()
|
||||
if peer.bad or not pairs:
|
||||
self._maybe_forget_peer(peer)
|
||||
else:
|
||||
self._retry_peer(peer, pairs)
|
||||
|
||||
def _retry_peer(self, peer, port_pairs):
|
||||
peer.last_try = time.time()
|
||||
|
||||
kwargs = {'loop': self.loop}
|
||||
|
||||
kind, port = port_pairs[0]
|
||||
if kind == 'SSL':
|
||||
kwargs['ssl'] = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
|
||||
host = self.env.cs_host(for_rpc=False)
|
||||
if isinstance(host, list):
|
||||
host = host[0]
|
||||
|
||||
if self.env.force_proxy or peer.is_tor:
|
||||
if not self.proxy:
|
||||
return
|
||||
kwargs['proxy'] = self.proxy
|
||||
kwargs['resolve'] = not peer.is_tor
|
||||
elif host:
|
||||
# Use our listening Host/IP for outgoing non-proxy
|
||||
# connections so our peers see the correct source.
|
||||
kwargs['local_addr'] = (host, None)
|
||||
|
||||
session = PeerSession(peer, self, kind, peer.host, port, **kwargs)
|
||||
callback = partial(self._on_connected, peer, port_pairs)
|
||||
self.tasks.create_task(session.create_connection(), callback)
|
||||
|
||||
def _on_connected(self, peer, port_pairs, task):
|
||||
'''Called when a connection attempt succeeds or fails.
|
||||
|
||||
If failed, close the session, log it and try remaining port pairs.
|
||||
'''
|
||||
if not task.cancelled() and task.exception():
|
||||
kind, port = port_pairs.pop(0)
|
||||
elapsed = time.time() - peer.last_try
|
||||
self.logger.info(f'failed connecting to {peer} at {kind} port '
|
||||
f'{port} in {elapsed:.1f}s: {task.exception()}')
|
||||
if port_pairs:
|
||||
self._retry_peer(peer, port_pairs)
|
||||
else:
|
||||
self._maybe_forget_peer(peer)
|
||||
|
||||
def _set_verification_status(self, peer, kind, good):
|
||||
'''Called when a verification succeeded or failed.'''
|
||||
now = time.time()
|
||||
if self.env.force_proxy or peer.is_tor:
|
||||
how = 'via {} over Tor'.format(kind)
|
||||
else:
|
||||
how = 'via {} at {}'.format(kind, peer.ip_addr)
|
||||
status = 'verified' if good else 'failed to verify'
|
||||
elapsed = now - peer.last_try
|
||||
self.logger.info(f'{status} {peer} {how} in {elapsed:.1f}s')
|
||||
|
||||
if good:
|
||||
peer.try_count = 0
|
||||
peer.last_good = now
|
||||
peer.source = 'peer'
|
||||
# At most 2 matches if we're a host name, potentially several if
|
||||
# we're an IP address (several instances can share a NAT).
|
||||
matches = peer.matches(self.peers)
|
||||
for match in matches:
|
||||
if match.ip_address:
|
||||
if len(matches) > 1:
|
||||
self.peers.remove(match)
|
||||
elif peer.host in match.features['hosts']:
|
||||
match.update_features_from_peer(peer)
|
||||
else:
|
||||
self._maybe_forget_peer(peer)
|
||||
|
||||
def _maybe_forget_peer(self, peer):
|
||||
'''Forget the peer if appropriate, e.g. long-term unreachable.'''
|
||||
if peer.last_good and not peer.bad:
|
||||
try_limit = 10
|
||||
else:
|
||||
try_limit = 3
|
||||
forget = peer.try_count >= try_limit
|
||||
|
||||
if forget:
|
||||
desc = 'bad' if peer.bad else 'unreachable'
|
||||
self.logger.info('forgetting {} peer: {}'.format(desc, peer))
|
||||
self.peers.discard(peer)
|
||||
|
||||
return forget
|
||||
|
||||
#
|
||||
# External interface
|
||||
#
|
||||
def start_peer_discovery(self):
|
||||
if self.env.peer_discovery == self.env.PD_ON:
|
||||
self.logger.info(f'beginning peer discovery. Force use of '
|
||||
f'proxy: {self.env.force_proxy}')
|
||||
self.tasks.create_task(self._peer_discovery_loop())
|
||||
else:
|
||||
self.logger.info('peer discovery is disabled')
|
||||
|
||||
def add_peers(self, peers, limit=2, check_ports=False, source=None):
|
||||
'''Add a limited number of peers that are not already present.'''
|
||||
retry = False
|
||||
@ -362,13 +522,17 @@ class PeerManager(object):
|
||||
if retry:
|
||||
self.retry_event.set()
|
||||
|
||||
def permit_new_onion_peer(self):
|
||||
'''Accept a new onion peer only once per random time interval.'''
|
||||
now = time.time()
|
||||
if now < self.permit_onion_peer_time:
|
||||
return False
|
||||
self.permit_onion_peer_time = now + random.randrange(0, 1200)
|
||||
return True
|
||||
def info(self):
|
||||
'''The number of peers.'''
|
||||
self._set_peer_statuses()
|
||||
counter = Counter(peer.status for peer in self.peers)
|
||||
return {
|
||||
'bad': counter[PEER_BAD],
|
||||
'good': counter[PEER_GOOD],
|
||||
'never': counter[PEER_NEVER],
|
||||
'stale': counter[PEER_STALE],
|
||||
'total': len(self.peers),
|
||||
}
|
||||
|
||||
async def on_add_peer(self, features, source_info):
|
||||
'''Add a peer (but only if the peer resolves to the source).'''
|
||||
@ -385,7 +549,7 @@ class PeerManager(object):
|
||||
peer = peers[0]
|
||||
host = peer.host
|
||||
if peer.is_tor:
|
||||
permit = self.permit_new_onion_peer()
|
||||
permit = self._permit_new_onion_peer()
|
||||
reason = 'rate limiting'
|
||||
else:
|
||||
try:
|
||||
@ -444,188 +608,22 @@ class PeerManager(object):
|
||||
|
||||
return [peer.to_tuple() for peer in peers]
|
||||
|
||||
def import_peers(self):
|
||||
'''Import hard-coded peers from a file or the coin defaults.'''
|
||||
self.add_peers(self.myselves)
|
||||
|
||||
# Add the hard-coded ones unless only returning self
|
||||
if self.env.peer_discovery != self.env.PD_SELF:
|
||||
coin_peers = self.env.coin.PEERS
|
||||
peers = [Peer.from_real_name(real_name, 'coins.py')
|
||||
for real_name in coin_peers]
|
||||
self.add_peers(peers, limit=None)
|
||||
|
||||
async def maybe_detect_proxy(self):
|
||||
'''Detect a proxy if we don't have one and some time has passed since
|
||||
the last attempt.
|
||||
|
||||
If found self.proxy is set to a SOCKSProxy instance, otherwise
|
||||
None.
|
||||
'''
|
||||
if self.proxy or time.time() - self.last_proxy_try < 900:
|
||||
return
|
||||
self.last_proxy_try = time.time()
|
||||
|
||||
host = self.env.tor_proxy_host
|
||||
if self.env.tor_proxy_port is None:
|
||||
ports = [9050, 9150, 1080]
|
||||
else:
|
||||
ports = [self.env.tor_proxy_port]
|
||||
self.logger.info(f'trying to detect proxy on "{host}" ports {ports}')
|
||||
|
||||
cls = SOCKSProxy
|
||||
result = await cls.auto_detect_host(host, ports, None, loop=self.loop)
|
||||
if isinstance(result, cls):
|
||||
self.proxy = result
|
||||
self.logger.info(f'detected {self.proxy}')
|
||||
|
||||
def proxy_peername(self):
|
||||
'''Return the peername of the proxy, if there is a proxy, otherwise
|
||||
None.'''
|
||||
return self.proxy.peername if self.proxy else None
|
||||
|
||||
def start_peer_discovery(self):
|
||||
if self.env.peer_discovery == self.env.PD_ON:
|
||||
self.logger.info(f'beginning peer discovery. Force use of '
|
||||
f'proxy: {self.env.force_proxy}')
|
||||
self.tasks.create_task(self.peer_discovery_loop())
|
||||
else:
|
||||
self.logger.info('peer discovery is disabled')
|
||||
def rpc_data(self):
|
||||
'''Peer data for the peers RPC method.'''
|
||||
self._set_peer_statuses()
|
||||
descs = ['good', 'stale', 'never', 'bad']
|
||||
|
||||
async def peer_discovery_loop(self):
|
||||
'''Main loop performing peer maintenance. This includes
|
||||
def peer_data(peer):
|
||||
data = peer.serialize()
|
||||
data['status'] = descs[peer.status]
|
||||
return data
|
||||
|
||||
1) Forgetting unreachable peers.
|
||||
2) Verifying connectivity of new peers.
|
||||
3) Retrying old peers at regular intervals.
|
||||
'''
|
||||
self.import_peers()
|
||||
def peer_key(peer):
|
||||
return (peer.bad, -peer.last_good)
|
||||
|
||||
try:
|
||||
while True:
|
||||
await self.maybe_detect_proxy()
|
||||
await self.retry_peers()
|
||||
timeout = self.loop.call_later(WAKEUP_SECS,
|
||||
self.retry_event.set)
|
||||
await self.retry_event.wait()
|
||||
self.retry_event.clear()
|
||||
timeout.cancel()
|
||||
finally:
|
||||
for session in list(PeerSession.sessions):
|
||||
session.abort()
|
||||
await session.wait_closed()
|
||||
|
||||
def is_coin_onion_peer(self, peer):
|
||||
'''Return true if this peer is a hard-coded onion peer.'''
|
||||
return peer.is_tor and any(peer.host in real_name
|
||||
for real_name in self.env.coin.PEERS)
|
||||
|
||||
async def retry_peers(self):
|
||||
'''Retry peers that are close to getting stale.'''
|
||||
# Exponential backoff of retries
|
||||
now = time.time()
|
||||
nearly_stale_time = (now - STALE_SECS) + WAKEUP_SECS * 2
|
||||
|
||||
def should_retry(peer):
|
||||
# Retry a peer whose ports might have updated
|
||||
if peer.other_port_pairs:
|
||||
return True
|
||||
# Retry a good connection if it is about to turn stale
|
||||
if peer.try_count == 0:
|
||||
return peer.last_good < nearly_stale_time
|
||||
# Retry a failed connection if enough time has passed
|
||||
return peer.last_try < now - WAKEUP_SECS * 2 ** peer.try_count
|
||||
|
||||
peers = [peer for peer in self.peers if should_retry(peer)]
|
||||
|
||||
for peer in peers:
|
||||
peer.try_count += 1
|
||||
pairs = peer.connection_port_pairs()
|
||||
if peer.bad or not pairs:
|
||||
self.maybe_forget_peer(peer)
|
||||
else:
|
||||
self.retry_peer(peer, pairs)
|
||||
|
||||
def retry_peer(self, peer, port_pairs):
|
||||
peer.last_try = time.time()
|
||||
|
||||
kwargs = {'loop': self.loop}
|
||||
|
||||
kind, port = port_pairs[0]
|
||||
if kind == 'SSL':
|
||||
kwargs['ssl'] = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
|
||||
host = self.env.cs_host(for_rpc=False)
|
||||
if isinstance(host, list):
|
||||
host = host[0]
|
||||
|
||||
if self.env.force_proxy or peer.is_tor:
|
||||
if not self.proxy:
|
||||
return
|
||||
kwargs['proxy'] = self.proxy
|
||||
kwargs['resolve'] = not peer.is_tor
|
||||
elif host:
|
||||
# Use our listening Host/IP for outgoing non-proxy
|
||||
# connections so our peers see the correct source.
|
||||
kwargs['local_addr'] = (host, None)
|
||||
|
||||
session = PeerSession(peer, self, kind, peer.host, port, **kwargs)
|
||||
callback = partial(self.on_connected, peer, port_pairs)
|
||||
self.tasks.create_task(session.create_connection(), callback)
|
||||
|
||||
def on_connected(self, peer, port_pairs, task):
|
||||
'''Called when a connection attempt succeeds or fails.
|
||||
|
||||
If failed, close the session, log it and try remaining port pairs.
|
||||
'''
|
||||
if not task.cancelled() and task.exception():
|
||||
kind, port = port_pairs.pop(0)
|
||||
elapsed = time.time() - peer.last_try
|
||||
self.logger.info(f'failed connecting to {peer} at {kind} port '
|
||||
f'{port} in {elapsed:.1f}s: {task.exception()}')
|
||||
if port_pairs:
|
||||
self.retry_peer(peer, port_pairs)
|
||||
else:
|
||||
self.maybe_forget_peer(peer)
|
||||
|
||||
def set_verification_status(self, peer, kind, good):
|
||||
'''Called when a verification succeeded or failed.'''
|
||||
now = time.time()
|
||||
if self.env.force_proxy or peer.is_tor:
|
||||
how = 'via {} over Tor'.format(kind)
|
||||
else:
|
||||
how = 'via {} at {}'.format(kind, peer.ip_addr)
|
||||
status = 'verified' if good else 'failed to verify'
|
||||
elapsed = now - peer.last_try
|
||||
self.logger.info(f'{status} {peer} {how} in {elapsed:.1f}s')
|
||||
|
||||
if good:
|
||||
peer.try_count = 0
|
||||
peer.last_good = now
|
||||
peer.source = 'peer'
|
||||
# At most 2 matches if we're a host name, potentially several if
|
||||
# we're an IP address (several instances can share a NAT).
|
||||
matches = peer.matches(self.peers)
|
||||
for match in matches:
|
||||
if match.ip_address:
|
||||
if len(matches) > 1:
|
||||
self.peers.remove(match)
|
||||
elif peer.host in match.features['hosts']:
|
||||
match.update_features_from_peer(peer)
|
||||
else:
|
||||
self.maybe_forget_peer(peer)
|
||||
|
||||
def maybe_forget_peer(self, peer):
|
||||
'''Forget the peer if appropriate, e.g. long-term unreachable.'''
|
||||
if peer.last_good and not peer.bad:
|
||||
try_limit = 10
|
||||
else:
|
||||
try_limit = 3
|
||||
forget = peer.try_count >= try_limit
|
||||
|
||||
if forget:
|
||||
desc = 'bad' if peer.bad else 'unreachable'
|
||||
self.logger.info('forgetting {} peer: {}'.format(desc, peer))
|
||||
self.peers.discard(peer)
|
||||
|
||||
return forget
|
||||
return [peer_data(peer) for peer in sorted(self.peers, key=peer_key)]
|
||||
|
||||
Loading…
Reference in New Issue
Block a user