Merge branch 'develop'

This commit is contained in:
Neil Booth 2017-01-23 07:28:27 +09:00
commit 2266b2e2f5
14 changed files with 237 additions and 147 deletions

View File

@ -137,6 +137,14 @@ version prior to the release of 1.0.
ChangeLog
=========
Version 0.10.10
---------------
* move peer management from irc.py to peers.py. This is preparataion
for peer discovery without IRC.
* misc cleanups
* fix Litecoin genesis hash (petrkr)
Version 0.10.9
--------------

View File

@ -239,8 +239,9 @@ connectivity on IRC:
* **REPORT_HOST_TOR**
The tor .onion address to advertise. If set, an additional
connection to IRC happens with '_tor" appended to **IRC_NICK**.
The tor address to advertise; must end with `.onion`. If set, an
additional connection to IRC happens with '_tor' appended to
**IRC_NICK**.
* **REPORT_TCP_PORT_TOR**

View File

@ -35,8 +35,7 @@ class RPCClient(JSONRPC):
self.max_buffer_size = 5000000
if params:
params = [params]
payload = self.request_payload(method, id_=method, params=params)
self.encode_and_send_payload(payload)
self.send_request(method, method, params)
future = asyncio.ensure_future(self.queue.get())
for f in asyncio.as_completed([future], timeout=timeout):
@ -80,6 +79,7 @@ def main():
except OSError:
print('error connecting - is ElectrumX catching up or not running?')
finally:
loop.stop()
loop.close()

View File

@ -1,4 +1,4 @@
# Copyright (c) 2016, Neil Booth
# Copyright (c) 2016-2017, Neil Booth
#
# All rights reserved.
#

View File

@ -13,6 +13,7 @@ import json
import numbers
import time
import traceback
from functools import partial
from lib.util import LoggedClass
@ -121,7 +122,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
NEXT_SESSION_ID = 0
@classmethod
def request_payload(cls, method, id_, params=None):
def request_payload(cls, id_, method, params=None):
payload = {'jsonrpc': '2.0', 'id': id_, 'method': method}
if params:
payload['params'] = params
@ -131,10 +132,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
def response_payload(cls, result, id_):
return {'jsonrpc': '2.0', 'result': result, 'id': id_}
@classmethod
def notification_payload(cls, method, params=None):
return cls.request_payload(method, None, params)
@classmethod
def error_payload(cls, message, code, id_=None):
error = {'message': message, 'code': code}
@ -166,6 +163,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
def __init__(self):
super().__init__()
self.send_notification = partial(self.send_request, None)
self.start = time.time()
self.stop = 0
self.last_recv = self.start
@ -316,6 +314,18 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
'''Send a JSON error.'''
self._send_bytes(self.json_error_bytes(message, code, id_))
def send_request(self, id_, method, params=None):
'''Send a request. If id_ is None it is a notification.'''
self.encode_and_send_payload(self.request_payload(id_, method, params))
def send_notifications(self, mp_iterable):
'''Send an iterable of (method, params) notification pairs.
A 1-tuple is also valid in which case there are no params.'''
# TODO: maybe send batches if remote side supports it
for pair in mp_iterable:
self.send_notification(*pair)
def encode_payload(self, payload):
assert isinstance(payload, dict)
@ -353,10 +363,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
'''Encode the payload and send it.'''
self._send_bytes(self.encode_payload(payload))
def json_notification_bytes(self, method, params):
'''Return the bytes of a json notification.'''
return self.encode_payload(self.notification_payload(method, params))
def json_request_bytes(self, method, id_, params=None):
'''Return the bytes of a JSON request.'''
return self.encode_payload(self.request_payload(method, id_, params))

View File

@ -1,4 +1,4 @@
# Copyright (c) 2016, Neil Booth
# Copyright (c) 2016-2017, Neil Booth
#
# All rights reserved.
#

View File

@ -1,4 +1,4 @@
# Copyright (c) 2016, Neil Booth
# Copyright (c) 2016-2017, Neil Booth
#
# All rights reserved.
#

View File

@ -1,4 +1,4 @@
# Copyright (c) 2016, Neil Booth
# Copyright (c) 2016-2017, Neil Booth
#
# All rights reserved.
#

View File

@ -23,8 +23,8 @@ from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
import lib.util as util
from server.block_processor import BlockProcessor
from server.daemon import Daemon, DaemonError
from server.irc import IRC
from server.session import LocalRPC, ElectrumX
from server.peers import PeerManager
from server.mempool import MemPool
from server.version import VERSION
@ -61,7 +61,7 @@ class Controller(util.LoggedClass):
self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url))
self.bp = BlockProcessor(env, self.daemon)
self.mempool = MemPool(self.bp)
self.irc = IRC(env)
self.peers = PeerManager(env)
self.env = env
self.servers = {}
# Map of session to the key of its list in self.groups
@ -73,7 +73,8 @@ class Controller(util.LoggedClass):
self.max_sessions = env.max_sessions
self.low_watermark = self.max_sessions * 19 // 20
self.max_subs = env.max_subs
self.subscription_count = 0
# Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0
self.next_stale_check = 0
self.history_cache = pylru.lrucache(256)
self.header_cache = pylru.lrucache(8)
@ -95,12 +96,14 @@ class Controller(util.LoggedClass):
'block.get_header block.get_chunk estimatefee relayfee '
'transaction.get transaction.get_merkle utxo.get_address'),
('server',
'banner donation_address peers.subscribe'),
'banner donation_address'),
]
self.electrumx_handlers = {'.'.join([prefix, suffix]):
getattr(self, suffix.replace('.', '_'))
for prefix, suffixes in rpcs
for suffix in suffixes.split()}
handlers = {'.'.join([prefix, suffix]):
getattr(self, suffix.replace('.', '_'))
for prefix, suffixes in rpcs
for suffix in suffixes.split()}
handlers['server.peers.subscribe'] = self.peers.subscribe
self.electrumx_handlers = handlers
async def mempool_transactions(self, hashX):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
@ -206,11 +209,11 @@ class Controller(util.LoggedClass):
async def await_bp_catchup():
'''Wait for the block processor to catch up.
When it has, start the servers and connect to IRC.
Then start the servers and the peer manager.
'''
await self.bp.caught_up_event.wait()
self.logger.info('block processor has caught up')
add_future(self.irc.start())
add_future(self.peers.main_loop())
add_future(self.start_servers())
add_future(self.mempool.main_loop())
add_future(self.enqueue_delayed_sessions())
@ -374,7 +377,6 @@ class Controller(util.LoggedClass):
gid = self.sessions.pop(session)
assert gid in self.groups
self.groups[gid].remove(session)
self.subscription_count -= session.sub_count()
def close_session(self, session):
'''Close the session's transport and cancel its future.'''
@ -433,13 +435,16 @@ class Controller(util.LoggedClass):
'logged': len([s for s in self.sessions if s.log_me]),
'paused': sum(s.pause for s in self.sessions),
'pid': os.getpid(),
'peers': len(self.irc.peers),
'peers': self.peers.count(),
'requests': sum(s.requests_remaining() for s in self.sessions),
'sessions': self.session_count(),
'subs': self.subscription_count,
'subs': self.sub_count(),
'txs_sent': self.txs_sent,
}
def sub_count(self):
return sum(s.sub_count() for s in self.sessions)
@staticmethod
def text_lines(method, data):
if method == 'sessions':
@ -590,7 +595,7 @@ class Controller(util.LoggedClass):
async def rpc_peers(self):
'''Return a list of server peers, currently taken from IRC.'''
return self.irc.peers
return self.peers.peer_list()
async def rpc_reorg(self, count=3):
'''Force a reorg of the given number of blocks.
@ -642,10 +647,12 @@ class Controller(util.LoggedClass):
raise RPCError('daemon error: {}'.format(e))
async def new_subscription(self, address):
if self.subscription_count >= self.max_subs:
raise RPCError('server subscription limit {:,d} reached'
.format(self.max_subs))
self.subscription_count += 1
if self.subs_room <= 0:
self.subs_room = self.max_subs - self.sub_count()
if self.subs_room <= 0:
raise RPCError('server subscription limit {:,d} reached'
.format(self.max_subs))
self.subs_room -= 1
hashX = self.address_to_hashX(address)
status = await self.address_status(hashX)
return hashX, status
@ -875,9 +882,3 @@ class Controller(util.LoggedClass):
async def donation_address(self):
'''Return the donation address as a string, empty if there is none.'''
return self.env.donation_address
async def peers_subscribe(self):
'''Returns the server peers as a list of (ip, host, ports) tuples.
Despite the name this is not currently treated as a subscription.'''
return list(self.irc.peers.values())

View File

@ -66,7 +66,7 @@ class Env(LoggedClass):
self.report_ssl_port
if self.report_ssl_port else
self.ssl_port)
self.report_host_tor = self.default('REPORT_HOST_TOR', None)
self.report_host_tor = self.default('REPORT_HOST_TOR', '')
def default(self, envvar, default):
return environ.get(envvar, default)

View File

@ -12,7 +12,6 @@ Only calling start() requires the IRC Python module.
import asyncio
import re
import socket
from collections import namedtuple
@ -22,52 +21,26 @@ from lib.util import LoggedClass
class IRC(LoggedClass):
Peer = namedtuple('Peer', 'ip_addr host ports')
class DisconnectedError(Exception):
pass
def __init__(self, env):
def __init__(self, env, peer_mgr):
super().__init__()
self.env = env
self.coin = env.coin
self.peer_mgr = peer_mgr
# If this isn't something a peer or client expects
# then you won't appear in the client's network dialog box
irc_address = (env.coin.IRC_SERVER, env.coin.IRC_PORT)
self.channel = env.coin.IRC_CHANNEL
self.prefix = env.coin.IRC_PREFIX
self.clients = []
self.nick = '{}{}'.format(self.prefix,
env.irc_nick if env.irc_nick else
double_sha256(env.report_host.encode())
[:5].hex())
self.clients.append(IrcClient(irc_address, self.nick,
env.report_host,
env.report_tcp_port,
env.report_ssl_port))
if env.report_host_tor:
self.clients.append(IrcClient(irc_address, self.nick + '_tor',
env.report_host_tor,
env.report_tcp_port_tor,
env.report_ssl_port_tor))
self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix))
self.peers = {}
async def start(self):
async def start(self, name_pairs):
'''Start IRC connections if enabled in environment.'''
try:
if self.env.irc:
await self.join()
else:
self.logger.info('IRC is disabled')
except asyncio.CancelledError:
pass
except Exception as e:
self.logger.error(str(e))
async def join(self):
import irc.client as irc_client
from jaraco.stream import buffer
@ -77,21 +50,18 @@ class IRC(LoggedClass):
# Register handlers for events we're interested in
reactor = irc_client.Reactor()
for event in 'welcome join quit kick whoreply disconnect'.split():
for event in 'welcome join quit whoreply disconnect'.split():
reactor.add_global_handler(event, getattr(self, 'on_' + event))
# Note: Multiple nicks in same channel will trigger duplicate events
for client in self.clients:
client.connection = reactor.server()
clients = [IrcClient(self.coin, real_name, self.nick + suffix,
reactor.server())
for (real_name, suffix) in name_pairs]
while True:
try:
for client in self.clients:
self.logger.info('Joining IRC in {} as "{}" with '
'real name "{}"'
.format(self.channel, client.nick,
client.realname))
client.connect()
for client in clients:
client.connect(self)
while True:
reactor.process_once()
await asyncio.sleep(2)
@ -130,14 +100,7 @@ class IRC(LoggedClass):
'''Called when someone leaves our channel.'''
match = self.peer_regexp.match(event.source)
if match:
self.peers.pop(match.group(1), None)
def on_kick(self, connection, event):
'''Called when someone is kicked from our channel.'''
self.log_event(event)
match = self.peer_regexp.match(event.arguments[0])
if match:
self.peers.pop(match.group(1), None)
self.peer_mgr.remove_irc_peer(match.group(1))
def on_whoreply(self, connection, event):
'''Called when a response to our who requests arrives.
@ -145,50 +108,25 @@ class IRC(LoggedClass):
The nick is the 4th argument, and real name is in the 6th
argument preceeded by '0 ' for some reason.
'''
try:
nick = event.arguments[4]
if nick.startswith(self.prefix):
line = event.arguments[6].split()
try:
ip_addr = socket.gethostbyname(line[1])
except socket.error:
# Could be .onion or IPv6.
ip_addr = line[1]
peer = self.Peer(ip_addr, line[1], line[2:])
self.peers[nick] = peer
except (IndexError, UnicodeError):
# UnicodeError comes from invalid domains (issue #68)
pass
nick = event.arguments[4]
if nick.startswith(self.prefix):
line = event.arguments[6].split()
hostname, details = line[1], line[2:]
self.peer_mgr.add_irc_peer(nick, hostname, details)
class IrcClient(LoggedClass):
class IrcClient(object):
VERSION = '1.0'
DEFAULT_PORTS = {'t': 50001, 's': 50002}
def __init__(self, irc_address, nick, host, tcp_port, ssl_port):
super().__init__()
self.irc_host, self.irc_port = irc_address
def __init__(self, coin, real_name, nick, server):
self.irc_host = coin.IRC_SERVER
self.irc_port = coin.IRC_PORT
self.nick = nick
self.realname = self.create_realname(host, tcp_port, ssl_port)
self.connection = None
self.real_name = real_name
self.server = server
def connect(self, keepalive=60):
def connect(self, irc):
'''Connect this client to its IRC server'''
self.connection.connect(self.irc_host, self.irc_port, self.nick,
ircname=self.realname)
self.connection.set_keepalive(keepalive)
@classmethod
def create_realname(cls, host, tcp_port, ssl_port):
def port_text(letter, port):
if not port:
return ''
if port == cls.DEFAULT_PORTS.get(letter):
return ' ' + letter
else:
return ' ' + letter + str(port)
tcp = port_text('t', tcp_port)
ssl = port_text('s', ssl_port)
return '{} v{}{}{}'.format(host, cls.VERSION, tcp, ssl)
irc.logger.info('joining {} as "{}" with real name "{}"'
.format(irc.channel, self.nick, self.real_name))
self.server.connect(self.irc_host, self.irc_port, self.nick,
ircname=self.real_name)

139
server/peers.py Normal file
View File

@ -0,0 +1,139 @@
# Copyright (c) 2017, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Peer management.'''
import asyncio
import socket
import traceback
from collections import namedtuple
from functools import partial
import lib.util as util
from server.irc import IRC
NetIdentity = namedtuple('NetIdentity', 'host tcp_port ssl_port nick_suffix')
IRCPeer = namedtuple('IRCPeer', 'ip_addr host details')
class PeerManager(util.LoggedClass):
'''Looks after the DB of peer network servers.
Attempts to maintain a connection with up to 8 peers.
Issues a 'peers.subscribe' RPC to them and tells them our data.
'''
VERSION = '1.0'
DEFAULT_PORTS = {'t': 50001, 's': 50002}
def __init__(self, env):
super().__init__()
self.env = env
self.loop = asyncio.get_event_loop()
self.irc = IRC(env, self)
self.futures = set()
self.identities = []
# Keyed by nick
self.irc_peers = {}
# We can have a Tor identity inaddition to a normal one
self.identities.append(NetIdentity(env.report_host,
env.report_tcp_port,
env.report_ssl_port,
''))
if env.report_host_tor.endswith('.onion'):
self.identities.append(NetIdentity(env.report_host_tor,
env.report_tcp_port_tor,
env.report_ssl_port_tor,
'_tor'))
async def executor(self, func, *args, **kwargs):
'''Run func taking args in the executor.'''
await self.loop.run_in_executor(None, partial(func, *args, **kwargs))
@classmethod
def real_name(cls, identity):
'''Real name as used on IRC.'''
def port_text(letter, port):
if not port:
return ''
if port == cls.DEFAULT_PORTS.get(letter):
return ' ' + letter
else:
return ' ' + letter + str(port)
tcp = port_text('t', identity.tcp_port)
ssl = port_text('s', identity.ssl_port)
return '{} v{}{}{}'.format(identity.host, cls.VERSION, tcp, ssl)
def ensure_future(self, coro):
'''Convert a coro into a future and add it to our pending list
to be waited for.'''
self.futures.add(asyncio.ensure_future(coro))
def start_irc(self):
'''Start up the IRC connections if enabled.'''
if self.env.irc:
name_pairs = [(self.real_name(identity), identity.nick_suffix)
for identity in self.identities]
self.ensure_future(self.irc.start(name_pairs))
else:
self.logger.info('IRC is disabled')
async def main_loop(self):
'''Start and then enter the main loop.'''
self.start_irc()
try:
while True:
await asyncio.sleep(10)
done = [future for future in self.futures if future.done()]
self.futures.difference_update(done)
for future in done:
try:
future.result()
except:
self.log_error(traceback.format_exc())
finally:
for future in self.futures:
future.cancel()
def dns_lookup_peer(self, nick, hostname, details):
try:
ip_addr = None
try:
ip_addr = socket.gethostbyname(hostname)
except socket.error:
pass # IPv6?
ip_addr = ip_addr or hostname
self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details)
self.logger.info('new IRC peer {} at {} ({})'
.format(nick, hostname, details))
except UnicodeError:
# UnicodeError comes from invalid domains (issue #68)
self.logger.info('IRC peer domain {} invalid'.format(hostname))
def add_irc_peer(self, *args):
'''Schedule DNS lookup of peer.'''
self.ensure_future(self.executor(self.dns_lookup_peer, *args))
def remove_irc_peer(self, nick):
'''Remove a peer from our IRC peers map.'''
self.logger.info('removing IRC peer {}'.format(nick))
self.irc_peers.pop(nick, None)
def count(self):
return len(self.irc_peers)
def peer_list(self):
return self.irc_peers
async def subscribe(self):
'''Returns the server peers as a list of (ip, host, details) tuples.
Despite the name this is not currently treated as a subscription.'''
return list(self.irc_peers.values())

View File

@ -135,32 +135,29 @@ class ElectrumX(Session):
Cache is a shared cache for this update.
'''
controller = self.controller
pairs = []
if height != self.notified_height:
self.notified_height = height
if self.subscribe_headers:
payload = self.notification_payload(
'blockchain.headers.subscribe',
(self.controller.electrum_header(height), ),
)
self.encode_and_send_payload(payload)
args = (controller.electrum_header(height), )
pairs.append(('blockchain.headers.subscribe', args))
if self.subscribe_height:
payload = self.notification_payload(
'blockchain.numblocks.subscribe',
(height, ),
)
self.encode_and_send_payload(payload)
pairs.append(('blockchain.numblocks.subscribe', (height, )))
matches = touched.intersection(self.hashX_subs)
for hashX in matches:
address = self.hashX_subs[hashX]
status = await self.controller.address_status(hashX)
payload = self.notification_payload(
'blockchain.address.subscribe', (address, status))
self.encode_and_send_payload(payload)
status = await controller.address_status(hashX)
pairs.append(('blockchain.address.subscribe', (address, status)))
self.send_notifications(pairs)
if matches:
self.log_info('notified of {:,d} addresses'.format(len(matches)))
es = '' if len(matches) == 1 else 'es'
self.log_info('notified of {:,d} address{}'
.format(len(matches), es))
def height(self):
'''Return the current flushed database height.'''

View File

@ -1 +1 @@
VERSION = "ElectrumX 0.10.9"
VERSION = "ElectrumX 0.10.10"