diff --git a/lib/coins.py b/lib/coins.py index ee63384..c9fd1d6 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -40,6 +40,9 @@ class Coin(object): IRC_SERVER = "irc.freenode.net" IRC_PORT = 6667 HASHX_LEN = 11 + # Peer discovery + PEER_DEFAULT_PORTS = {'t':'50001', 's':'50002'} + PEERS = [] @classmethod def lookup_coin_class(cls, name, net): @@ -274,6 +277,21 @@ class Bitcoin(Coin): IRC_PREFIX = "E_" IRC_CHANNEL = "#electrum" RPC_PORT = 8332 + PEERS = [ + '4cii7ryno5j3axe4.onion t' + 'btc.smsys.me s995', + 'ca6ulp2j2mpsft3y.onion s t', + 'electrum.be s t', + 'electrum.trouth.net s t', + 'electrum.vom-stausee.de s t', + 'electrum3.hachre.de s t', + 'Electrum.hsmiths.com s t', + 'erbium1.sytes.net s t', + 'h.1209k.com s t', + 'helicarrier.bauerj.eu s t', + 'ozahtqwp25chjdjd.onion s t', + 'us11.einfachmalnettsein.de s t', + ] class BitcoinTestnet(Bitcoin): @@ -292,7 +310,14 @@ class BitcoinTestnet(Bitcoin): TX_PER_BLOCK = 21 IRC_PREFIX = "ET_" RPC_PORT = 18332 - + PEER_DEFAULT_PORTS = {'t':'51001', 's':'51002'} + PEERS = [ + 'electrum.akinbo.org s t', + 'he36kyperp3kbuxu.onion s t', + 'electrum-btc-testnet.petrkr.net s t', + 'h.hsmiths.com t53011 s53012', + 'testnet.not.fyi s t', + ] class BitcoinTestnetSegWit(BitcoinTestnet): '''Bitcoin Testnet for Core bitcoind >= 0.13.1. diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index a3b9cc8..fb2240d 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -452,6 +452,8 @@ class JSONSessionBase(util.LoggedClass): return self.response_bytes(result, payload['id']) except RPCError as e: return self.error_bytes(e.msg, e.code, self.payload_id(payload)) + except asyncio.CancelledError: + raise except Exception: self.log_error(traceback.format_exc()) return self.error_bytes('internal error processing request', @@ -701,3 +703,15 @@ class JSONSession(JSONSessionBase, asyncio.Protocol): def send_bytes(self, binary): '''Send JSON text over the transport.''' self.transport.writelines((binary, b'\n')) + + def peer_addr(self, anon=True): + '''Return the peer address and port.''' + peer_info = self.peer_info() + if not peer_info: + return 'unknown' + if anon: + return 'xx.xx.xx.xx:xx' + if ':' in peer_info[0]: + return '[{}]:{}'.format(peer_info[0], peer_info[1]) + else: + return '{}:{}'.format(peer_info[0], peer_info[1]) diff --git a/lib/util.py b/lib/util.py index 54f40e1..1fdf1f7 100644 --- a/lib/util.py +++ b/lib/util.py @@ -181,11 +181,16 @@ class LogicalFile(object): ''' file_num, offset = divmod(start, self.file_size) filename = self.filename_fmt.format(file_num) - try: - f= open(filename, 'rb+') - except FileNotFoundError: - if not create: - raise - f = open(filename, 'wb+') + f = open_file(filename, create) f.seek(offset) return f + + +def open_file(filename, create=False): + '''Open the file name. Return its handle.''' + try: + return open(filename, 'rb+') + except FileNotFoundError: + if create: + return open(filename, 'wb+') + raise diff --git a/server/controller.py b/server/controller.py index 334608b..0e5ce8f 100644 --- a/server/controller.py +++ b/server/controller.py @@ -53,7 +53,7 @@ class Controller(util.LoggedClass): self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url)) self.bp = BlockProcessor(env, self, self.daemon) self.mempool = MemPool(self.bp, self) - self.peers = PeerManager(env, self) + self.peer_mgr = PeerManager(env, self) self.env = env self.servers = {} # Map of session to the key of its list in self.groups @@ -65,7 +65,7 @@ class Controller(util.LoggedClass): self.max_sessions = env.max_sessions self.low_watermark = self.max_sessions * 19 // 20 self.max_subs = env.max_subs - self.futures = set() + self.futures = {} # Cache some idea of room to avoid recounting on each subscription self.subs_room = 0 self.next_stale_check = 0 @@ -163,7 +163,7 @@ class Controller(util.LoggedClass): data = self.session_data(for_log=True) for line in Controller.sessions_text_lines(data): self.logger.info(line) - self.logger.info(json.dumps(self.server_summary())) + self.logger.info(json.dumps(self.getinfo())) self.next_log_sessions = time.time() + self.env.log_sessions await asyncio.sleep(1) @@ -208,28 +208,31 @@ class Controller(util.LoggedClass): '''Schedule running func in the executor, return a task.''' return self.ensure_future(self.run_in_executor(func, *args)) - def ensure_future(self, coro): + def ensure_future(self, coro, callback=None): '''Schedule the coro to be run.''' future = asyncio.ensure_future(coro) future.add_done_callback(self.on_future_done) - self.futures.add(future) + self.futures[future] = callback return future def on_future_done(self, future): '''Collect the result of a future after removing it from our set.''' - self.futures.remove(future) - try: - future.result() - except asyncio.CancelledError: - pass - except Exception: - self.log_error(traceback.format_exc()) + callback = self.futures.pop(future) + if callback: + callback(future) + else: + try: + future.result() + except asyncio.CancelledError: + pass + except Exception: + self.log_error(traceback.format_exc()) async def wait_for_bp_catchup(self): '''Called when the block processor catches up.''' await self.bp.caught_up_event.wait() self.logger.info('block processor has caught up') - self.ensure_future(self.peers.main_loop()) + self.ensure_future(self.peer_mgr.main_loop()) self.ensure_future(self.start_servers()) self.ensure_future(self.mempool.main_loop()) self.ensure_future(self.enqueue_delayed_sessions()) @@ -444,7 +447,7 @@ class Controller(util.LoggedClass): '''The number of connections that we've sent something to.''' return len(self.sessions) - def server_summary(self): + def getinfo(self): '''A one-line summary of server state.''' return { 'daemon_height': self.daemon.cached_height(), @@ -455,7 +458,7 @@ class Controller(util.LoggedClass): 'logged': len([s for s in self.sessions if s.log_me]), 'paused': sum(s.pause for s in self.sessions), 'pid': os.getpid(), - 'peers': self.peers.count(), + 'peers': self.peer_mgr.count(), 'requests': sum(s.count_pending_items() for s in self.sessions), 'sessions': self.session_count(), 'subs': self.sub_count(), @@ -517,7 +520,7 @@ class Controller(util.LoggedClass): return ('{:3d}:{:02d}:{:02d}' .format(t // 3600, (t % 3600) // 60, t % 60)) - fmt = ('{:<6} {:<5} {:>15} {:>5} {:>5} ' + fmt = ('{:<6} {:<5} {:>17} {:>5} {:>5} ' '{:>7} {:>7} {:>7} {:>7} {:>7} {:>9} {:>21}') yield fmt.format('ID', 'Flags', 'Client', 'Reqs', 'Txs', 'Subs', 'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Time', 'Peer') @@ -596,20 +599,20 @@ class Controller(util.LoggedClass): def rpc_getinfo(self): '''Return summary information about the server process.''' - return self.server_summary() + return self.getinfo() def rpc_groups(self): '''Return statistics about the session groups.''' return self.group_data() + def rpc_peers(self): + '''Return a list of data about server peers.''' + return self.peer_mgr.rpc_data() + def rpc_sessions(self): '''Return statistics about connected sessions.''' return self.session_data(for_log=False) - def rpc_peers(self): - '''Return a list of server peers, currently taken from IRC.''' - return self.peers.peer_dict() - def rpc_reorg(self, count=3): '''Force a reorg of the given number of blocks. diff --git a/server/db.py b/server/db.py index 44dacf3..a11a316 100644 --- a/server/db.py +++ b/server/db.py @@ -99,7 +99,7 @@ class DB(util.LoggedClass): self.logger.info('created new database') self.logger.info('creating metadata diretcory') os.mkdir('meta') - with self.open_file('COIN', create=True) as f: + with util.open_file('COIN', create=True) as f: f.write('ElectrumX databases and metadata for {} {}' .format(self.coin.NAME, self.coin.NET).encode()) else: @@ -183,15 +183,6 @@ class DB(util.LoggedClass): self.clear_excess_history(self.utxo_flush_count) self.clear_excess_undo_info() - def open_file(self, filename, create=False): - '''Open the file name. Return its handle.''' - try: - return open(filename, 'rb+') - except FileNotFoundError: - if create: - return open(filename, 'wb+') - raise - def fs_update(self, fs_height, headers, block_tx_hashes): '''Write headers, the tx_count array and block tx hashes to disk. diff --git a/server/env.py b/server/env.py index 167b2aa..64541ff 100644 --- a/server/env.py +++ b/server/env.py @@ -8,12 +8,16 @@ '''Class for handling environment configuration and defaults.''' +from collections import namedtuple from os import environ from lib.coins import Coin from lib.util import LoggedClass +NetIdentity = namedtuple('NetIdentity', 'host tcp_port ssl_port nick_suffix') + + class Env(LoggedClass): '''Wraps environment configuration.''' @@ -55,18 +59,24 @@ class Env(LoggedClass): # IRC self.irc = self.default('IRC', False) self.irc_nick = self.default('IRC_NICK', None) - self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port) - self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port) - self.report_host = self.default('REPORT_HOST', self.host) - self.report_tcp_port_tor = self.integer('REPORT_TCP_PORT_TOR', - self.report_tcp_port - if self.report_tcp_port else - self.tcp_port) - self.report_ssl_port_tor = self.integer('REPORT_SSL_PORT_TOR', - self.report_ssl_port - if self.report_ssl_port else - self.ssl_port) - self.report_host_tor = self.default('REPORT_HOST_TOR', '') + self.identity = NetIdentity( + self.default('REPORT_HOST', self.host), + self.integer('REPORT_TCP_PORT', self.tcp_port) or None, + self.integer('REPORT_SSL_PORT', self.ssl_port) or None, + '' + ) + self.tor_identity = NetIdentity( + self.default('REPORT_HOST_TOR', ''), # must be a string + self.integer('REPORT_TCP_PORT_TOR', + self.identity.tcp_port + if self.identity.tcp_port else + self.tcp_port) or None, + self.integer('REPORT_SSL_PORT_TOR', + self.identity.ssl_port + if self.identity.ssl_port else + self.ssl_port) or None, + '_tor' + ) def default(self, envvar, default): return environ.get(envvar, default) diff --git a/server/peers.py b/server/peers.py index 7017f04..c7fdaa2 100644 --- a/server/peers.py +++ b/server/peers.py @@ -8,7 +8,6 @@ '''Peer management.''' import asyncio -import itertools import socket from collections import namedtuple @@ -16,7 +15,6 @@ import lib.util as util from server.irc import IRC -NetIdentity = namedtuple('NetIdentity', 'host tcp_port ssl_port nick_suffix') IRCPeer = namedtuple('IRCPeer', 'ip_addr host details') @@ -26,8 +24,7 @@ class PeerManager(util.LoggedClass): Attempts to maintain a connection with up to 8 peers. Issues a 'peers.subscribe' RPC to them and tells them our data. ''' - VERSION = '1.0' - DEFAULT_PORTS = {'t': 50001, 's': 50002} + PROTOCOL_VERSION = '1.0' def __init__(self, env, controller): super().__init__() @@ -38,59 +35,44 @@ class PeerManager(util.LoggedClass): self._identities = [] # Keyed by nick self.irc_peers = {} - self.updated_nicks = set() + self._identities.append(env.identity) + if env.tor_identity.host.endswith('.onion'): + self._identities.append(env.tor_identity) - # We can have a Tor identity inaddition to a normal one - self._identities.append(self.identity(env.report_host, - env.report_tcp_port, - env.report_ssl_port, - '')) - if env.report_host_tor.endswith('.onion'): - self._identities.append(self.identity(env.report_host_tor, - env.report_tcp_port_tor, - env.report_ssl_port_tor, - '_tor')) - - @classmethod - def identity(self, host, tcp_port, ssl_port, suffix): - '''Returns a NetIdentity object. Unpublished ports are None.''' - return NetIdentity(host, tcp_port or None, ssl_port or None, suffix) - - @classmethod - def real_name(cls, identity): + def real_name(self, host, protocol_version, tcp_port, ssl_port): '''Real name as used on IRC.''' - def port_text(letter, port): - if not port: - return '' - if port == cls.DEFAULT_PORTS.get(letter): - return ' ' + letter - else: - return ' ' + letter + str(port) + default_ports = self.env.coin.PEER_DEFAULT_PORTS - tcp = port_text('t', identity.tcp_port) - ssl = port_text('s', identity.ssl_port) - return '{} v{}{}{}'.format(identity.host, cls.VERSION, tcp, ssl) + def port_text(letter, port): + if port == default_ports.get(letter): + return letter + else: + return letter + str(port) + + parts = [host, 'v' + protocol_version] + for letter, port in (('s', ssl_port), ('t', tcp_port)): + if port: + parts.append(port_text(letter, port)) + return ' '.join(parts) + + def irc_name_pairs(self): + return [(self.real_name(identity.host, self.PROTOCOL_VERSION, + identity.tcp_port, identity.ssl_port), + identity.nick_suffix) + for identity in self._identities] def identities(self): '''Return a list of network identities of this server.''' return self._identities - async def refresh_peer_subs(self): - for n in itertools.count(): - await asyncio.sleep(60) - updates = [self.irc_peers[nick] for nick in self.updated_nicks - if nick in self.irc_peers] - if updates: - self.controller.notify_peers(updates) - self.updated_nicks.clear() + def ensure_future(self, coro, callback=None): + '''Schedule the coro to be run.''' + return self.controller.ensure_future(coro, callback=callback) async def main_loop(self): '''Not a loop for now...''' - self.controller.ensure_future(self.refresh_peer_subs()) if self.env.irc: - name_pairs = [(self.real_name(identity), identity.nick_suffix) - for identity in self._identities] - self.controller.ensure_future(self.irc.start(name_pairs)) + self.ensure_future(self.irc.start(self.irc_name_pairs())) else: self.logger.info('IRC is disabled') @@ -102,7 +84,6 @@ class PeerManager(util.LoggedClass): except socket.error: pass # IPv6? ip_addr = ip_addr or hostname - self.updated_nicks.add(nick) self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details) self.logger.info('new IRC peer {} at {} ({})' .format(nick, hostname, details)) @@ -122,9 +103,9 @@ class PeerManager(util.LoggedClass): def count(self): return len(self.irc_peers) - def peer_dict(self): + def rpc_data(self): return self.irc_peers - def peer_list(self): + def on_peers_subscribe(self): '''Returns the server peers as a list of (ip, host, details) tuples.''' return list(self.irc_peers.values()) diff --git a/server/session.py b/server/session.py index a4e7fd0..47f5f66 100644 --- a/server/session.py +++ b/server/session.py @@ -8,7 +8,6 @@ '''Classes for local RPC server and remote client TCP/SSL servers.''' import time -import traceback from functools import partial from lib.jsonrpc import JSONSession, RPCError @@ -52,16 +51,8 @@ class SessionBase(JSONSession): super().close_connection() def peername(self, *, for_log=True): - '''Return the peer name of this connection.''' - peer_info = self.peer_info() - if not peer_info: - return 'unknown' - if for_log and self.anon_logs: - return 'xx.xx.xx.xx:xx' - if ':' in peer_info[0]: - return '[{}]:{}'.format(peer_info[0], peer_info[1]) - else: - return '{}:{}'.format(peer_info[0], peer_info[1]) + '''Return the peer address and port.''' + return self.peer_addr(anon=for_log and self.anon_logs) def flags(self): '''Status flags.''' @@ -104,7 +95,6 @@ class ElectrumX(SessionBase): super().__init__(*args, **kwargs) self.subscribe_headers = False self.subscribe_height = False - self.subscribe_peers = False self.notified_height = None self.max_send = self.env.max_send self.max_subs = self.env.max_session_subs @@ -169,22 +159,9 @@ class ElectrumX(SessionBase): self.subscribe_height = True return self.height() - def peers_subscribe(self, incremental=False): - '''Returns the server peers as a list of (ip, host, details) tuples. - - If incremental is False there is no subscription. If True the - remote side will receive notifications of new or modified - peers (peers that disappeared are not notified). - ''' - self.subscribe_peers = incremental - return self.controller.peers.peer_list() - - def notify_peers(self, updates): - '''Notify of peer updates. Updates are sent as a list in the same - format as the subscription reply, as the first parameter. - ''' - if self.subscribe_peers: - self.send_notification('server.peers.subscribe', [updates]) + def peers_subscribe(self): + '''Return the server peers as a list of (ip, host, details) tuples.''' + return self.controller.peer_mgr.on_peers_subscribe() async def address_subscribe(self, address): '''Subscribe to an address. @@ -201,16 +178,17 @@ class ElectrumX(SessionBase): def server_features(self): '''Returns a dictionary of server features.''' - peers = self.controller.peers + peer_mgr = self.controller.peer_mgr hosts = {identity.host: { 'tcp_port': identity.tcp_port, 'ssl_port': identity.ssl_port, - 'pruning': peers.pruning, - 'version': peers.VERSION, - } for identity in self.controller.peers.identities()} + } for identity in peer_mgr.identities()} return { 'hosts': hosts, + 'pruning': peer_mgr.pruning, + 'protocol_version': peer_mgr.PROTOCOL_VERSION, + 'server_version': VERSION, } def server_version(self, client_name=None, protocol_version=None): @@ -220,7 +198,7 @@ class ElectrumX(SessionBase): protocol_version: the protocol version spoken by the client ''' if client_name: - self.client = str(client_name)[:15] + self.client = str(client_name)[:17] if protocol_version is not None: self.protocol_version = protocol_version return VERSION