Convert PeerSession to use aiorpcX

This commit is contained in:
Neil Booth 2018-03-11 16:02:55 +08:00
parent 0ef6267396
commit e69b1d930f

View File

@ -17,7 +17,6 @@ from functools import partial
import aiorpcx import aiorpcx
from lib.jsonrpc import JSONSession
from lib.peer import Peer from lib.peer import Peer
import lib.util as util import lib.util as util
import server.version as version import server.version as version
@ -28,191 +27,178 @@ STALE_SECS = 24 * 3600
WAKEUP_SECS = 300 WAKEUP_SECS = 300
class PeerSession(JSONSession): class PeerSession(aiorpcx.ClientSession, util.LoggedClass):
'''An outgoing session to a peer.''' '''An outgoing session to a peer.'''
def __init__(self, peer, peer_mgr, kind): def __init__(self, peer, peer_mgr, kind, host, port, **kwargs):
super().__init__() super().__init__(host, port, **kwargs)
self.max_send = 0 util.LoggedClass.__init__(self)
self.peer = peer self.peer = peer
self.peer_mgr = peer_mgr self.peer_mgr = peer_mgr
self.kind = kind self.kind = kind
self.failed = False self.timeout = 20 if self.peer.is_tor else 10
self.bad = False
self.remote_peers = None
self.log_prefix = '[{}] '.format(self.peer)
async def wait_on_items(self):
while True:
await self.items_event.wait()
await self.process_pending_items()
def connection_made(self, transport): def connection_made(self, transport):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
super().connection_made(transport) super().connection_made(transport)
self.log_prefix = '[{}] '.format(str(self.peer)[:25])
self.future = self.peer_mgr.ensure_future(self.wait_on_items())
# Update IP address # Update IP address if not Tor
if not self.peer.is_tor: if not self.peer.is_tor:
peer_info = self.peer_info() address = self.peer_address()
if peer_info: if address:
self.peer.ip_addr = peer_info[0] self.peer.ip_addr = address[0]
# Collect data # Send server.version first
proto_ver = (version.PROTOCOL_MIN, version.PROTOCOL_MAX) args = [version.VERSION, [version.PROTOCOL_MIN, version.PROTOCOL_MAX]]
self.send_request(self.on_version, 'server.version', self.send_request('server.version', args, self.on_version,
[version.VERSION, proto_ver]) timeout=self.timeout)
self.send_request(self.on_features, 'server.features')
self.send_request(self.on_height, 'blockchain.headers.subscribe')
self.send_request(self.on_peers_subscribe, 'server.peers.subscribe')
def connection_lost(self, exc): def is_good(self, request, instance):
'''Handle disconnection.''' try:
super().connection_lost(exc) result = request.result()
self.future.cancel() except asyncio.CancelledError:
return False
except asyncio.TimeoutError as e:
self.fail(request, str(e))
return False
except RPCError as error:
self.fail(request, f'{error.message} ({error.code})')
return False
def on_peers_subscribe(self, result, error): if isinstance(result, instance):
'''Handle the response to the peers.subcribe message.''' return True
if error:
self.failed = True
self.log_error('server.peers.subscribe: {}'.format(error))
else:
# Save for later analysis
self.remote_peers = result
self.close_if_done()
def on_add_peer(self, result, error): self.fail(request, f'{request} returned bad result type '
'''We got a response the add_peer message.''' f'{type(result).__name__}')
# This is the last thing we were waiting for; shutdown the connection return False
self.shutdown_connection()
def on_features(self, features, error): def fail(self, request, reason):
# Several peers don't implement this. If they do, check they are self.logger.error(f'[{self.peer.host}] {request} failed: {reason}')
# the same network with the genesis hash. self.peer_mgr.set_verification_status(self.peer, self.kind, False)
if not error and isinstance(features, dict): self.close()
hosts = [host.lower() for host in features.get('hosts', {})]
our_hash = self.peer_mgr.env.coin.GENESIS_HASH
if our_hash != features.get('genesis_hash'):
self.bad = True
self.log_warning('incorrect genesis hash')
elif self.peer.host.lower() in hosts:
self.peer.update_features(features)
else:
self.bad = True
self.log_warning('ignoring - not listed in host list {}'
.format(hosts))
self.close_if_done()
def on_height(self, result, error): def bad(self, reason):
'''Handle the response to blockchain.headers.subscribe message.''' self.logger.error(f'[{self.peer.host}] marking bad: {reason}')
if error: self.peer.mark_bad()
self.failed = True self.peer_mgr.set_verification_status(self.peer, self.kind, False)
self.log_error('blockchain.headers.subscribe returned an error') self.close()
elif not isinstance(result, dict):
self.bad = True
self.log_error('bad blockchain.headers.subscribe response')
else:
controller = self.peer_mgr.controller
our_height = controller.bp.db_height
their_height = result.get('block_height')
if not isinstance(their_height, int):
self.log_warning('invalid height {}'.format(their_height))
self.bad = True
elif abs(our_height - their_height) > 5:
self.log_warning('bad height {:,d} (ours: {:,d})'
.format(their_height, our_height))
self.bad = True
# Check prior header too in case of hard fork. def on_version(self, request):
if not self.bad:
check_height = min(our_height, their_height)
self.send_request(self.on_header, 'blockchain.block.get_header',
[check_height])
self.expected_header = controller.electrum_header(check_height)
self.close_if_done()
def on_header(self, result, error):
'''Handle the response to blockchain.block.get_header message.
Compare hashes of prior header in attempt to determine if forked.'''
if error:
self.failed = True
self.log_error('blockchain.block.get_header returned an error')
elif not isinstance(result, dict):
self.bad = True
self.log_error('bad blockchain.block.get_header response')
else:
theirs = result.get('prev_block_hash')
ours = self.expected_header.get('prev_block_hash')
if ours != theirs:
self.log_error('our header hash {} and theirs {} differ'
.format(ours, theirs))
self.bad = True
self.close_if_done()
def on_version(self, result, error):
'''Handle the response to the version message.''' '''Handle the response to the version message.'''
if error: if not self.is_good(request, (list, str)):
self.failed = True return
self.log_error('server.version returned an error')
result = request.result()
if isinstance(result, str):
version = result
else: else:
# Protocol version 1.1 returns a pair with the version first # Protocol version 1.1 returns a pair with the version first
if isinstance(result, list) and len(result) == 2: if len(result) < 2 or not isinstance(result[0], str):
result = result[0] self.fail(request, 'result array bad format')
if isinstance(result, str): return
self.peer.server_version = result version = result[0]
self.peer.features['server_version'] = result self.peer.server_version = version
self.close_if_done() self.peer.features['server_version'] = version
def check_remote_peers(self): for method, on_done in [
'''Check the peers list we got from a remote peer. ('blockchain.headers.subscribe', self.on_height),
('server.features', self.on_features),
('server.peers.subscribe', self.on_peers_subscribe),
]:
self.send_request(method, on_done=on_done, timeout=self.timeout)
Each update is expected to be of the form: def on_features(self, request):
[ip_addr, hostname, ['v1.0', 't51001', 's51002']] if not self.is_good(request, dict):
return
Call add_peer if the remote doesn't appear to know about us. features = request.result()
''' hosts = [host.lower() for host in features.get('hosts', {})]
our_hash = self.peer_mgr.env.coin.GENESIS_HASH
if our_hash != features.get('genesis_hash'):
self.bad('incorrect genesis hash')
elif self.peer.host.lower() in hosts:
self.peer.update_features(features)
self.maybe_close()
else:
self.bad('ignoring - not listed in host list {}'.format(hosts))
def on_height(self, request):
'''Handle the response to blockchain.headers.subscribe message.'''
if not self.is_good(request, dict):
return
result = request.result()
controller = self.peer_mgr.controller
our_height = controller.bp.db_height
their_height = result.get('block_height')
if not isinstance(their_height, int):
self.bad('invalid height {}'.format(their_height))
return
if abs(our_height - their_height) > 5:
self.bad('bad height {:,d} (ours: {:,d})'
.format(their_height, our_height))
return
# Check prior header too in case of hard fork.
check_height = min(our_height, their_height)
expected_header = controller.electrum_header(check_height)
self.send_request('blockchain.block.get_header', [check_height],
partial(self.on_header, expected_header),
timeout=self.timeout)
def on_header(self, expected_header, request):
'''Handle the response to blockchain.block.get_header message.
Compare hashes of prior header in attempt to determine if forked.'''
if not self.is_good(request, dict):
return
result = request.result()
theirs = result.get('prev_block_hash')
ours = expected_header.get('prev_block_hash')
if ours == theirs:
self.maybe_close()
else:
self.bad('our header hash {} and theirs {} differ'
.format(ours, theirs))
def on_peers_subscribe(self, request):
'''Handle the response to the peers.subcribe message.'''
if not self.is_good(request, list):
return
# Check the peers list we got from a remote peer.
# Each is expected to be of the form:
# [ip_addr, hostname, ['v1.0', 't51001', 's51002']]
# Call add_peer if the remote doesn't appear to know about us.
raw_peers = request.result()
try: try:
real_names = [' '.join([u[1]] + u[2]) for u in self.remote_peers] real_names = [' '.join([u[1]] + u[2]) for u in raw_peers]
peers = [Peer.from_real_name(real_name, str(self.peer)) peers = [Peer.from_real_name(real_name, str(self.peer))
for real_name in real_names] for real_name in real_names]
except Exception: except Exception:
self.log_error('bad server.peers.subscribe response') self.bad('bad server.peers.subscribe response')
return return
self.peer_mgr.add_peers(peers) features = self.peer_mgr.features_to_register(self.peer, peers)
if features:
self.logger.info(f'[{self.peer.host}] registering ourself with '
'"server.add_peer"')
self.send_request('server.add_peer', [features],
self.on_add_peer, timeout=self.timeout)
else:
self.maybe_close()
# Announce ourself if not present. Don't if disabled, we def on_add_peer(self, request):
# are a non-public IP address, or to ourselves. '''We got a response the add_peer message. Don't care about its
if not self.peer_mgr.env.peer_announce: form.'''
return self.maybe_close()
if self.peer in self.peer_mgr.myselves:
return
my = self.peer_mgr.my_clearnet_peer()
if not my or not my.is_public:
return
for peer in my.matches(peers):
if peer.tcp_port == my.tcp_port and peer.ssl_port == my.ssl_port:
return
self.log_info('registering ourself with server.add_peer') def maybe_close(self):
self.send_request(self.on_add_peer, 'server.add_peer', [my.features]) '''Close the connection if no requests are outstanding, and mark peer
as good.
def close_if_done(self): '''
if not self.has_pending_requests(): if not self.all_requests():
if self.bad: self.close()
self.peer.mark_bad() self.peer_mgr.set_verification_status(self.peer, self.kind, True)
elif self.remote_peers:
self.check_remote_peers()
# We might now be waiting for an add_peer response
if not self.has_pending_requests():
self.shutdown_connection()
def shutdown_connection(self):
is_good = not (self.failed or self.bad)
self.peer_mgr.set_verification_status(self.peer, self.kind, is_good)
self.close_connection()
class PeerManager(util.LoggedClass): class PeerManager(util.LoggedClass):
@ -287,6 +273,26 @@ class PeerManager(util.LoggedClass):
return [peer_data(peer) for peer in sorted(self.peers, key=peer_key)] return [peer_data(peer) for peer in sorted(self.peers, key=peer_key)]
def features_to_register(self, peer, remote_peers):
'''If we should register ourselves to the remote peer, which has
reported the given list of known peers, return the clearnet
identity features to register, otherwise None.
'''
self.add_peers(remote_peers)
# Announce ourself if not present. Don't if disabled, we
# are a non-public IP address, or to ourselves.
if not self.env.peer_announce or peer in self.myselves:
return None
my = self.my_clearnet_peer()
if not my or not my.is_public:
return None
# Register if no matches, or ports have changed
for peer in my.matches(remote_peers):
if peer.tcp_port == my.tcp_port and peer.ssl_port == my.ssl_port:
return None
return my.features
def add_peers(self, peers, limit=2, check_ports=False, source=None): def add_peers(self, peers, limit=2, check_ports=False, source=None):
'''Add a limited number of peers that are not already present.''' '''Add a limited number of peers that are not already present.'''
retry = False retry = False
@ -505,45 +511,43 @@ class PeerManager(util.LoggedClass):
def retry_peer(self, peer, port_pairs): def retry_peer(self, peer, port_pairs):
peer.last_try = time.time() peer.last_try = time.time()
kwargs = {'loop': self.loop}
kind, port = port_pairs[0] kind, port = port_pairs[0]
sslc = ssl.SSLContext(ssl.PROTOCOL_TLS) if kind == 'SSL' else None if kind == 'SSL':
kwargs['ssl'] = ssl.SSLContext(ssl.PROTOCOL_TLS)
host = self.env.cs_host(for_rpc=False) host = self.env.cs_host(for_rpc=False)
if isinstance(host, list): if isinstance(host, list):
host = host[0] host = host[0]
kwargs = {'ssl': sslc}
if self.env.force_proxy or peer.is_tor: if self.env.force_proxy or peer.is_tor:
# Only attempt a proxy connection if we have one
if not self.proxy: if not self.proxy:
return return
create_connection = self.proxy.create_connection kwargs['proxy'] = self.proxy
else: elif host:
create_connection = self.loop.create_connection # Use our listening Host/IP for outgoing non-proxy
# Use our listening Host/IP for outgoing connections so # connections so our peers see the correct source.
# our peers see the correct source. kwargs['local_addr'] = (host, None)
if host:
kwargs['local_addr'] = (host, None)
protocol_factory = partial(PeerSession, peer, self, kind) session = PeerSession(peer, self, kind, peer.host, port, **kwargs)
coro = create_connection(protocol_factory, peer.host, port, **kwargs) callback = partial(self.on_connected, session, peer, port_pairs)
callback = partial(self.connection_done, peer, port_pairs) self.ensure_future(session.create_connection(), callback)
self.ensure_future(coro, callback)
def connection_done(self, peer, port_pairs, future): def on_connected(self, session, peer, port_pairs, future):
'''Called when a connection attempt succeeds or fails. '''Called when a connection attempt succeeds or fails.
If failed, log it and try remaining port pairs. If none, If failed, close the session, log it and try remaining port pairs.
release the connection count semaphore.
''' '''
exception = future.exception() exception = future.exception()
if exception: if exception:
kind, port = port_pairs[0] session.close()
self.logger.info('failed connecting to {} at {} port {:d} ' kind, port = port_pairs.pop(0)
'in {:.1f}s: {}' self.log_info('failed connecting to {} at {} port {:d} '
.format(peer, kind, port, 'in {:.1f}s: {}'
time.time() - peer.last_try, exception)) .format(peer, kind, port,
port_pairs = port_pairs[1:] time.time() - peer.last_try, exception))
if port_pairs: if port_pairs:
self.retry_peer(peer, port_pairs) self.retry_peer(peer, port_pairs)
else: else: