diff --git a/docs/ENVIRONMENT.rst b/docs/ENVIRONMENT.rst index 41f4284..0835acd 100644 --- a/docs/ENVIRONMENT.rst +++ b/docs/ENVIRONMENT.rst @@ -89,7 +89,7 @@ These environment variables are optional: * **SSL_PORT** If set ElectrumX will serve SSL clients on **HOST**:**SSL_PORT**. - If set SSL_CERTFILE and SSL_KEYFILE must be defined and be + If set then SSL_CERTFILE and SSL_KEYFILE must be defined and be filesystem paths to those SSL files. * **RPC_PORT** diff --git a/server/controller.py b/server/controller.py index 8692e27..334608b 100644 --- a/server/controller.py +++ b/server/controller.py @@ -91,12 +91,10 @@ class Controller(util.LoggedClass): ('server', 'banner donation_address'), ] - handlers = {'.'.join([prefix, suffix]): - getattr(self, suffix.replace('.', '_')) - for prefix, suffixes in rpcs - for suffix in suffixes.split()} - handlers['server.peers.subscribe'] = self.peers.subscribe - self.electrumx_handlers = handlers + self.electrumx_handlers = {'.'.join([prefix, suffix]): + getattr(self, suffix.replace('.', '_')) + for prefix, suffixes in rpcs + for suffix in suffixes.split()} async def mempool_transactions(self, hashX): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool @@ -358,6 +356,11 @@ class Controller(util.LoggedClass): for session in sessions: await session.notify(self.bp.db_height, touched) + def notify_peers(self, updates): + '''Notify of peer updates.''' + for session in self.sessions: + session.notify_peers(updates) + def electrum_header(self, height): '''Return the binary header at the given height.''' if not 0 <= height <= self.bp.db_height: @@ -605,7 +608,7 @@ class Controller(util.LoggedClass): def rpc_peers(self): '''Return a list of server peers, currently taken from IRC.''' - return self.peers.peer_list() + return self.peers.peer_dict() def rpc_reorg(self, count=3): '''Force a reorg of the given number of blocks. diff --git a/server/peers.py b/server/peers.py index 2f1e832..7017f04 100644 --- a/server/peers.py +++ b/server/peers.py @@ -7,6 +7,8 @@ '''Peer management.''' +import asyncio +import itertools import socket from collections import namedtuple @@ -32,20 +34,27 @@ class PeerManager(util.LoggedClass): self.env = env self.controller = controller self.irc = IRC(env, self) - self.identities = [] + self.pruning = None + self._identities = [] # Keyed by nick self.irc_peers = {} + self.updated_nicks = set() # We can have a Tor identity inaddition to a normal one - self.identities.append(NetIdentity(env.report_host, - env.report_tcp_port, - env.report_ssl_port, - '')) + self._identities.append(self.identity(env.report_host, + env.report_tcp_port, + env.report_ssl_port, + '')) if env.report_host_tor.endswith('.onion'): - self.identities.append(NetIdentity(env.report_host_tor, - env.report_tcp_port_tor, - env.report_ssl_port_tor, - '_tor')) + self._identities.append(self.identity(env.report_host_tor, + env.report_tcp_port_tor, + env.report_ssl_port_tor, + '_tor')) + + @classmethod + def identity(self, host, tcp_port, ssl_port, suffix): + '''Returns a NetIdentity object. Unpublished ports are None.''' + return NetIdentity(host, tcp_port or None, ssl_port or None, suffix) @classmethod def real_name(cls, identity): @@ -62,19 +71,29 @@ class PeerManager(util.LoggedClass): ssl = port_text('s', identity.ssl_port) return '{} v{}{}{}'.format(identity.host, cls.VERSION, tcp, ssl) - def start_irc(self): - '''Start up the IRC connections if enabled.''' + def identities(self): + '''Return a list of network identities of this server.''' + return self._identities + + async def refresh_peer_subs(self): + for n in itertools.count(): + await asyncio.sleep(60) + updates = [self.irc_peers[nick] for nick in self.updated_nicks + if nick in self.irc_peers] + if updates: + self.controller.notify_peers(updates) + self.updated_nicks.clear() + + async def main_loop(self): + '''Not a loop for now...''' + self.controller.ensure_future(self.refresh_peer_subs()) if self.env.irc: name_pairs = [(self.real_name(identity), identity.nick_suffix) - for identity in self.identities] + for identity in self._identities] self.controller.ensure_future(self.irc.start(name_pairs)) else: self.logger.info('IRC is disabled') - async def main_loop(self): - '''Main loop. No loop for now.''' - self.start_irc() - def dns_lookup_peer(self, nick, hostname, details): try: ip_addr = None @@ -83,6 +102,7 @@ class PeerManager(util.LoggedClass): except socket.error: pass # IPv6? ip_addr = ip_addr or hostname + self.updated_nicks.add(nick) self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details) self.logger.info('new IRC peer {} at {} ({})' .format(nick, hostname, details)) @@ -102,11 +122,9 @@ class PeerManager(util.LoggedClass): def count(self): return len(self.irc_peers) - def peer_list(self): + def peer_dict(self): return self.irc_peers - def subscribe(self): - '''Returns the server peers as a list of (ip, host, details) tuples. - - Despite the name this is not currently treated as a subscription.''' + def peer_list(self): + '''Returns the server peers as a list of (ip, host, details) tuples.''' return list(self.irc_peers.values()) diff --git a/server/session.py b/server/session.py index 0b341e8..a4e7fd0 100644 --- a/server/session.py +++ b/server/session.py @@ -7,7 +7,6 @@ '''Classes for local RPC server and remote client TCP/SSL servers.''' - import time import traceback from functools import partial @@ -105,6 +104,7 @@ class ElectrumX(SessionBase): super().__init__(*args, **kwargs) self.subscribe_headers = False self.subscribe_height = False + self.subscribe_peers = False self.notified_height = None self.max_send = self.env.max_send self.max_subs = self.env.max_session_subs @@ -114,6 +114,8 @@ class ElectrumX(SessionBase): 'blockchain.headers.subscribe': self.headers_subscribe, 'blockchain.numblocks.subscribe': self.numblocks_subscribe, 'blockchain.transaction.broadcast': self.transaction_broadcast, + 'server.features': self.server_features, + 'server.peers.subscribe': self.peers_subscribe, 'server.version': self.server_version, } @@ -167,6 +169,23 @@ class ElectrumX(SessionBase): self.subscribe_height = True return self.height() + def peers_subscribe(self, incremental=False): + '''Returns the server peers as a list of (ip, host, details) tuples. + + If incremental is False there is no subscription. If True the + remote side will receive notifications of new or modified + peers (peers that disappeared are not notified). + ''' + self.subscribe_peers = incremental + return self.controller.peers.peer_list() + + def notify_peers(self, updates): + '''Notify of peer updates. Updates are sent as a list in the same + format as the subscription reply, as the first parameter. + ''' + if self.subscribe_peers: + self.send_notification('server.peers.subscribe', [updates]) + async def address_subscribe(self, address): '''Subscribe to an address. @@ -180,6 +199,20 @@ class ElectrumX(SessionBase): self.hashX_subs[hashX] = address return status + def server_features(self): + '''Returns a dictionary of server features.''' + peers = self.controller.peers + hosts = {identity.host: { + 'tcp_port': identity.tcp_port, + 'ssl_port': identity.ssl_port, + 'pruning': peers.pruning, + 'version': peers.VERSION, + } for identity in self.controller.peers.identities()} + + return { + 'hosts': hosts, + } + def server_version(self, client_name=None, protocol_version=None): '''Returns the server version as a string.