From 23c46167114ead57e36b1bd7dcf2c30c1f03a744 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 18 Jan 2017 08:00:30 +0900 Subject: [PATCH 01/12] Fix testcase --- server/storage.py | 4 ++-- tests/test_util.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/storage.py b/server/storage.py index fedcfe3..074d8a3 100644 --- a/server/storage.py +++ b/server/storage.py @@ -10,11 +10,11 @@ import os from functools import partial -from lib.util import subclasses, increment_byte_string +import lib.util as util def db_class(name): '''Returns a DB engine class.''' - for db_class in subclasses(Storage): + for db_class in util.subclasses(Storage): if db_class.__name__.lower() == name.lower(): db_class.import_module() return db_class diff --git a/tests/test_util.py b/tests/test_util.py index a6b79ef..bb98c83 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -56,4 +56,4 @@ def test_chunks(): def test_increment_byte_string(): assert util.increment_byte_string(b'1') == b'2' assert util.increment_byte_string(b'\x01\x01') == b'\x01\x02' - assert util.increment_byte_string(b'\xff\xff') == b'\x01\x00\x00' + assert util.increment_byte_string(b'\xff\xff') == None From b03a44ca5afb587f12ce205b99d5311905097adb Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 18 Jan 2017 08:04:14 +0900 Subject: [PATCH 02/12] Prepare 0.10.8 --- README.rst | 6 ++++++ server/version.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 757de75..10ff0ef 100644 --- a/README.rst +++ b/README.rst @@ -137,6 +137,12 @@ version prior to the release of 1.0. ChangeLog ========= +Version 0.10.8 +-------------- + +* fix import for reverse iterator for RocksDB +* fix tests + Version 0.10.7 -------------- diff --git a/server/version.py b/server/version.py index b8afe92..6a12211 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.10.7" +VERSION = "ElectrumX 0.10.8" From 9a8c598fa808b1325c184ab4969b68b9f3a7f753 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 18 Jan 2017 19:50:26 +0900 Subject: [PATCH 03/12] The session needs to handle the version command. --- server/controller.py | 14 +------------- server/session.py | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/server/controller.py b/server/controller.py index 76aa455..0b6e5f3 100644 --- a/server/controller.py +++ b/server/controller.py @@ -95,7 +95,7 @@ class Controller(util.LoggedClass): 'block.get_header block.get_chunk estimatefee relayfee ' 'transaction.get transaction.get_merkle utxo.get_address'), ('server', - 'banner donation_address peers.subscribe version'), + 'banner donation_address peers.subscribe'), ] self.electrumx_handlers = {'.'.join([prefix, suffix]): getattr(self, suffix.replace('.', '_')) @@ -886,15 +886,3 @@ class Controller(util.LoggedClass): Despite the name this is not currently treated as a subscription.''' return list(self.irc.peers.values()) - - async def version(self, client_name=None, protocol_version=None): - '''Returns the server version as a string. - - client_name: a string identifying the client - protocol_version: the protocol version spoken by the client - ''' - if client_name: - self.client = str(client_name)[:15] - if protocol_version is not None: - self.protocol_version = protocol_version - return VERSION diff --git a/server/session.py b/server/session.py index d646c4f..23cf9dc 100644 --- a/server/session.py +++ b/server/session.py @@ -13,6 +13,7 @@ import traceback from lib.jsonrpc import JSONRPC, RPCError from server.daemon import DaemonError +from server.version import VERSION class Session(JSONRPC): @@ -123,6 +124,7 @@ class ElectrumX(Session): 'blockchain.headers.subscribe': self.headers_subscribe, 'blockchain.numblocks.subscribe': self.numblocks_subscribe, 'blockchain.transaction.broadcast': self.transaction_broadcast, + 'server.version': self.version, } def sub_count(self): @@ -191,6 +193,18 @@ class ElectrumX(Session): self.hashX_subs[hashX] = address return status + async def version(self, client_name=None, protocol_version=None): + '''Returns the server version as a string. + + client_name: a string identifying the client + protocol_version: the protocol version spoken by the client + ''' + if client_name: + self.client = str(client_name)[:15] + if protocol_version is not None: + self.protocol_version = protocol_version + return VERSION + async def transaction_broadcast(self, raw_tx): '''Broadcast a raw transaction to the network. From 4729ba2e210966e38185a82e978d23a3026905c3 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 18 Jan 2017 20:28:57 +0900 Subject: [PATCH 04/12] Tweak the shutdown process Clean it up a bit and make it harder to do wrongly. --- server/block_processor.py | 7 +++++-- server/controller.py | 29 ++++++++++++----------------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 6efabbb..ea8fb8c 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -196,11 +196,14 @@ class BlockProcessor(server.db.DB): task = await self.task_queue.get() await task() - def shutdown(self): + def shutdown(self, executor): + '''Shutdown cleanly and flush to disk.''' + # First stut down the executor; it may be processing a block. + # Then we can flush anything remaining to disk. + executor.shutdown() if self.height != self.db_height: self.logger.info('flushing state to DB for a clean shutdown...') self.flush(True) - self.logger.info('shutdown complete') async def executor(self, func, *args, **kwargs): '''Run func taking args in the executor.''' diff --git a/server/controller.py b/server/controller.py index 0b6e5f3..bfc49ee 100644 --- a/server/controller.py +++ b/server/controller.py @@ -225,7 +225,13 @@ class Controller(util.LoggedClass): # Perform a clean shutdown when this event is signalled. await self.shutdown_event.wait() - self.logger.info('shutting down gracefully') + + self.logger.info('shutting down') + await self.shutdown(futures) + self.logger.info('shutdown complete') + + async def shutdown(self, futures): + '''Perform the shutdown sequence.''' self.state = self.SHUTTING_DOWN # Close servers and sessions @@ -237,11 +243,12 @@ class Controller(util.LoggedClass): for future in futures: future.cancel() - await asyncio.wait(futures) + # Wait for all futures to finish + while any(not future.done() for future in futures): + await asyncio.sleep(1) - # Wait for the executor to finish anything it's doing - self.executor.shutdown() - self.bp.shutdown() + # Finally shut down the block processor and executor + self.bp.shutdown(self.executor) def close_servers(self, kinds): '''Close the servers of the given kinds (TCP etc.).''' @@ -253,18 +260,6 @@ class Controller(util.LoggedClass): if server: server.close() - async def wait_for_sessions(self, secs=30): - if not self.sessions: - return - self.logger.info('waiting up to {:d} seconds for socket cleanup' - .format(secs)) - limit = time.time() + secs - while self.sessions and time.time() < limit: - self.clear_stale_sessions(grace=secs//2) - await asyncio.sleep(2) - self.logger.info('{:,d} sessions remaining' - .format(len(self.sessions))) - async def start_server(self, kind, *args, **kw_args): protocol_class = LocalRPC if kind == 'RPC' else ElectrumX protocol = partial(protocol_class, self, self.bp, self.env, kind) From 112e0a12a9d9235bde35b91e963311ec3b561bb3 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 18 Jan 2017 20:33:45 +0900 Subject: [PATCH 05/12] Prepare 0.10.9 --- README.rst | 50 ++++++----------------------------------------- server/version.py | 2 +- 2 files changed, 7 insertions(+), 45 deletions(-) diff --git a/README.rst b/README.rst index 10ff0ef..d9a5c4a 100644 --- a/README.rst +++ b/README.rst @@ -137,6 +137,12 @@ version prior to the release of 1.0. ChangeLog ========= +Version 0.10.9 +-------------- + +* restore client to sessions output +* cleanup shutdown process; hopefully this resolves the log spew for good + Version 0.10.8 -------------- @@ -226,46 +232,6 @@ variables to use roughly the same amount of memory. For now this code should be considered experimental; if you want stability please stick with the 0.9 series. -Version 0.9.23 --------------- - -* Backport of the fix for issue `#94#` - stale references to old - sessions. This would effectively memory and network handles. - -Version 0.9.22 --------------- - -* documentation updates (ARCHITECTURE.rst, ENVIRONMENT.rst) only. - -Version 0.9.21 --------------- - -* moved RELEASE-NOTES into this README -* document the RPC interface in docs/RPC-INTERFACE.rst -* clean up open DB handling, issue `#89`_ - -Version 0.9.20 --------------- - -* fix for IRC flood issue `#93`_ - -Version 0.9.19 --------------- - -* move sleep outside semaphore (issue `#88`_) - -Version 0.9.18 --------------- - -* last release of 2016. Just a couple of minor tweaks to logging. - -Version 0.9.17 --------------- - -* have all the DBs use fsync on write; hopefully means DB won't corrupt in - case of a kernel panic (issue `#75`_) -* replace $DONATION_ADDRESS in banner file - **Neil Booth** kyuupichan@gmail.com https://github.com/kyuupichan @@ -273,11 +239,7 @@ Version 0.9.17 .. _#72: https://github.com/kyuupichan/electrumx/issues/72 -.. _#75: https://github.com/kyuupichan/electrumx/issues/75 -.. _#88: https://github.com/kyuupichan/electrumx/issues/88 -.. _#89: https://github.com/kyuupichan/electrumx/issues/89 .. _#92: https://github.com/kyuupichan/electrumx/issues/92 -.. _#93: https://github.com/kyuupichan/electrumx/issues/93 .. _#94: https://github.com/kyuupichan/electrumx/issues/94 .. _#99: https://github.com/kyuupichan/electrumx/issues/99 .. _#100: https://github.com/kyuupichan/electrumx/issues/100 diff --git a/server/version.py b/server/version.py index 6a12211..22f8ce3 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.10.8" +VERSION = "ElectrumX 0.10.9" From 766da5ed7926dec6fb4d4d18f467a49663c76f93 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Fri, 20 Jan 2017 08:09:33 +0900 Subject: [PATCH 06/12] Don't maintain a sub_count in controller Just do a tally occasionally. It's too tricky to keep it accurate as it double-counts resubscribing to the same address, for example --- lib/hash.py | 2 +- lib/script.py | 2 +- lib/tx.py | 2 +- lib/util.py | 2 +- server/controller.py | 19 ++++++++++++------- 5 files changed, 16 insertions(+), 11 deletions(-) diff --git a/lib/hash.py b/lib/hash.py index 1563adc..7ab28af 100644 --- a/lib/hash.py +++ b/lib/hash.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016, Neil Booth +# Copyright (c) 2016-2017, Neil Booth # # All rights reserved. # diff --git a/lib/script.py b/lib/script.py index 4c2b65a..d29ce1a 100644 --- a/lib/script.py +++ b/lib/script.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016, Neil Booth +# Copyright (c) 2016-2017, Neil Booth # # All rights reserved. # diff --git a/lib/tx.py b/lib/tx.py index 565595e..31d2186 100644 --- a/lib/tx.py +++ b/lib/tx.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016, Neil Booth +# Copyright (c) 2016-2017, Neil Booth # # All rights reserved. # diff --git a/lib/util.py b/lib/util.py index cc755b8..54f40e1 100644 --- a/lib/util.py +++ b/lib/util.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016, Neil Booth +# Copyright (c) 2016-2017, Neil Booth # # All rights reserved. # diff --git a/server/controller.py b/server/controller.py index bfc49ee..015b2d1 100644 --- a/server/controller.py +++ b/server/controller.py @@ -73,7 +73,8 @@ class Controller(util.LoggedClass): self.max_sessions = env.max_sessions self.low_watermark = self.max_sessions * 19 // 20 self.max_subs = env.max_subs - self.subscription_count = 0 + # Cache some idea of room to avoid recounting on each subscription + self.subs_room = 0 self.next_stale_check = 0 self.history_cache = pylru.lrucache(256) self.header_cache = pylru.lrucache(8) @@ -374,7 +375,6 @@ class Controller(util.LoggedClass): gid = self.sessions.pop(session) assert gid in self.groups self.groups[gid].remove(session) - self.subscription_count -= session.sub_count() def close_session(self, session): '''Close the session's transport and cancel its future.''' @@ -436,10 +436,13 @@ class Controller(util.LoggedClass): 'peers': len(self.irc.peers), 'requests': sum(s.requests_remaining() for s in self.sessions), 'sessions': self.session_count(), - 'subs': self.subscription_count, + 'subs': self.sub_count(), 'txs_sent': self.txs_sent, } + def sub_count(self): + return sum(s.sub_count() for s in self.sessions) + @staticmethod def text_lines(method, data): if method == 'sessions': @@ -642,10 +645,12 @@ class Controller(util.LoggedClass): raise RPCError('daemon error: {}'.format(e)) async def new_subscription(self, address): - if self.subscription_count >= self.max_subs: - raise RPCError('server subscription limit {:,d} reached' - .format(self.max_subs)) - self.subscription_count += 1 + if self.subs_room <= 0: + self.subs_room = self.max_subs - self.sub_count() + if self.subs_room <= 0: + raise RPCError('server subscription limit {:,d} reached' + .format(self.max_subs)) + self.subs_room -= 1 hashX = self.address_to_hashX(address) status = await self.address_status(hashX) return hashX, status From ed0646efbf7b239f5e54a33572a87a4d0c233550 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Fri, 20 Jan 2017 19:30:55 +0900 Subject: [PATCH 07/12] Tidy up request and notification sending --- electrumx_rpc.py | 4 ++-- lib/jsonrpc.py | 24 +++++++++++++++--------- server/session.py | 27 ++++++++++++--------------- 3 files changed, 29 insertions(+), 26 deletions(-) diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 7cc2fbb..deecd1e 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -35,8 +35,7 @@ class RPCClient(JSONRPC): self.max_buffer_size = 5000000 if params: params = [params] - payload = self.request_payload(method, id_=method, params=params) - self.encode_and_send_payload(payload) + self.send_request(method, method, params) future = asyncio.ensure_future(self.queue.get()) for f in asyncio.as_completed([future], timeout=timeout): @@ -80,6 +79,7 @@ def main(): except OSError: print('error connecting - is ElectrumX catching up or not running?') finally: + loop.stop() loop.close() diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 91cb405..7657cfb 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -13,6 +13,7 @@ import json import numbers import time import traceback +from functools import partial from lib.util import LoggedClass @@ -121,7 +122,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): NEXT_SESSION_ID = 0 @classmethod - def request_payload(cls, method, id_, params=None): + def request_payload(cls, id_, method, params=None): payload = {'jsonrpc': '2.0', 'id': id_, 'method': method} if params: payload['params'] = params @@ -131,10 +132,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass): def response_payload(cls, result, id_): return {'jsonrpc': '2.0', 'result': result, 'id': id_} - @classmethod - def notification_payload(cls, method, params=None): - return cls.request_payload(method, None, params) - @classmethod def error_payload(cls, message, code, id_=None): error = {'message': message, 'code': code} @@ -166,6 +163,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): def __init__(self): super().__init__() + self.send_notification = partial(self.send_request, None) self.start = time.time() self.stop = 0 self.last_recv = self.start @@ -316,6 +314,18 @@ class JSONRPC(asyncio.Protocol, LoggedClass): '''Send a JSON error.''' self._send_bytes(self.json_error_bytes(message, code, id_)) + def send_request(self, id_, method, params=None): + '''Send a request. If id_ is None it is a notification.''' + self.encode_and_send_payload(self.request_payload(id_, method, params)) + + def send_notifications(self, mp_iterable): + '''Send an iterable of (method, params) notification pairs. + + A 1-tuple is also valid in which case there are no params.''' + # TODO: maybe send batches if remote side supports it + for pair in mp_iterable: + self.send_notification(*pair) + def encode_payload(self, payload): assert isinstance(payload, dict) @@ -353,10 +363,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass): '''Encode the payload and send it.''' self._send_bytes(self.encode_payload(payload)) - def json_notification_bytes(self, method, params): - '''Return the bytes of a json notification.''' - return self.encode_payload(self.notification_payload(method, params)) - def json_request_bytes(self, method, id_, params=None): '''Return the bytes of a JSON request.''' return self.encode_payload(self.request_payload(method, id_, params)) diff --git a/server/session.py b/server/session.py index 23cf9dc..46ca43e 100644 --- a/server/session.py +++ b/server/session.py @@ -135,32 +135,29 @@ class ElectrumX(Session): Cache is a shared cache for this update. ''' + controller = self.controller + pairs = [] + if height != self.notified_height: self.notified_height = height if self.subscribe_headers: - payload = self.notification_payload( - 'blockchain.headers.subscribe', - (self.controller.electrum_header(height), ), - ) - self.encode_and_send_payload(payload) + args = (controller.electrum_header(height), ) + pairs.append(('blockchain.headers.subscribe', args)) if self.subscribe_height: - payload = self.notification_payload( - 'blockchain.numblocks.subscribe', - (height, ), - ) - self.encode_and_send_payload(payload) + pairs.append(('blockchain.numblocks.subscribe', (height, ))) matches = touched.intersection(self.hashX_subs) for hashX in matches: address = self.hashX_subs[hashX] - status = await self.controller.address_status(hashX) - payload = self.notification_payload( - 'blockchain.address.subscribe', (address, status)) - self.encode_and_send_payload(payload) + status = await controller.address_status(hashX) + pairs.append(('blockchain.address.subscribe', (address, status))) + self.send_notifications(pairs) if matches: - self.log_info('notified of {:,d} addresses'.format(len(matches))) + es = '' if len(matches) == 1 else 'es' + self.log_info('notified of {:,d} address{}' + .format(len(matches), es)) def height(self): '''Return the current flushed database height.''' From 05a6da1920e0c5ab1051bcaf7310c307ae53fe01 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 22 Jan 2017 19:21:55 +0900 Subject: [PATCH 08/12] Move peer management to peers.py from irc It's cleaner and will be useful for peer-to-peer comms later --- docs/ENVIRONMENT.rst | 5 +- server/controller.py | 30 ++++------ server/env.py | 2 +- server/irc.py | 116 +++++++++--------------------------- server/peers.py | 139 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 183 insertions(+), 109 deletions(-) create mode 100644 server/peers.py diff --git a/docs/ENVIRONMENT.rst b/docs/ENVIRONMENT.rst index 24e4993..f323356 100644 --- a/docs/ENVIRONMENT.rst +++ b/docs/ENVIRONMENT.rst @@ -239,8 +239,9 @@ connectivity on IRC: * **REPORT_HOST_TOR** - The tor .onion address to advertise. If set, an additional - connection to IRC happens with '_tor" appended to **IRC_NICK**. + The tor address to advertise; must end with `.onion`. If set, an + additional connection to IRC happens with '_tor' appended to + **IRC_NICK**. * **REPORT_TCP_PORT_TOR** diff --git a/server/controller.py b/server/controller.py index 015b2d1..f2a8a6d 100644 --- a/server/controller.py +++ b/server/controller.py @@ -23,8 +23,8 @@ from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash import lib.util as util from server.block_processor import BlockProcessor from server.daemon import Daemon, DaemonError -from server.irc import IRC from server.session import LocalRPC, ElectrumX +from server.peers import PeerManager from server.mempool import MemPool from server.version import VERSION @@ -61,7 +61,7 @@ class Controller(util.LoggedClass): self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url)) self.bp = BlockProcessor(env, self.daemon) self.mempool = MemPool(self.bp) - self.irc = IRC(env) + self.peers = PeerManager(env) self.env = env self.servers = {} # Map of session to the key of its list in self.groups @@ -96,12 +96,14 @@ class Controller(util.LoggedClass): 'block.get_header block.get_chunk estimatefee relayfee ' 'transaction.get transaction.get_merkle utxo.get_address'), ('server', - 'banner donation_address peers.subscribe'), + 'banner donation_address'), ] - self.electrumx_handlers = {'.'.join([prefix, suffix]): - getattr(self, suffix.replace('.', '_')) - for prefix, suffixes in rpcs - for suffix in suffixes.split()} + handlers = {'.'.join([prefix, suffix]): + getattr(self, suffix.replace('.', '_')) + for prefix, suffixes in rpcs + for suffix in suffixes.split()} + handlers['server.peers.subscribe'] = self.peers.subscribe + self.electrumx_handlers = handlers async def mempool_transactions(self, hashX): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool @@ -207,11 +209,11 @@ class Controller(util.LoggedClass): async def await_bp_catchup(): '''Wait for the block processor to catch up. - When it has, start the servers and connect to IRC. + Then start the servers and the peer manager. ''' await self.bp.caught_up_event.wait() self.logger.info('block processor has caught up') - add_future(self.irc.start()) + add_future(self.peers.main_loop()) add_future(self.start_servers()) add_future(self.mempool.main_loop()) add_future(self.enqueue_delayed_sessions()) @@ -433,7 +435,7 @@ class Controller(util.LoggedClass): 'logged': len([s for s in self.sessions if s.log_me]), 'paused': sum(s.pause for s in self.sessions), 'pid': os.getpid(), - 'peers': len(self.irc.peers), + 'peers': self.peers.count(), 'requests': sum(s.requests_remaining() for s in self.sessions), 'sessions': self.session_count(), 'subs': self.sub_count(), @@ -593,7 +595,7 @@ class Controller(util.LoggedClass): async def rpc_peers(self): '''Return a list of server peers, currently taken from IRC.''' - return self.irc.peers + return self.peers.peer_list() async def rpc_reorg(self, count=3): '''Force a reorg of the given number of blocks. @@ -880,9 +882,3 @@ class Controller(util.LoggedClass): async def donation_address(self): '''Return the donation address as a string, empty if there is none.''' return self.env.donation_address - - async def peers_subscribe(self): - '''Returns the server peers as a list of (ip, host, ports) tuples. - - Despite the name this is not currently treated as a subscription.''' - return list(self.irc.peers.values()) diff --git a/server/env.py b/server/env.py index 196d24c..2304284 100644 --- a/server/env.py +++ b/server/env.py @@ -66,7 +66,7 @@ class Env(LoggedClass): self.report_ssl_port if self.report_ssl_port else self.ssl_port) - self.report_host_tor = self.default('REPORT_HOST_TOR', None) + self.report_host_tor = self.default('REPORT_HOST_TOR', '') def default(self, envvar, default): return environ.get(envvar, default) diff --git a/server/irc.py b/server/irc.py index 31154d6..469adc8 100644 --- a/server/irc.py +++ b/server/irc.py @@ -12,7 +12,6 @@ Only calling start() requires the IRC Python module. import asyncio import re -import socket from collections import namedtuple @@ -22,52 +21,26 @@ from lib.util import LoggedClass class IRC(LoggedClass): - Peer = namedtuple('Peer', 'ip_addr host ports') - class DisconnectedError(Exception): pass - def __init__(self, env): + def __init__(self, env, peer_mgr): super().__init__() - self.env = env + self.coin = env.coin + self.peer_mgr = peer_mgr # If this isn't something a peer or client expects # then you won't appear in the client's network dialog box - irc_address = (env.coin.IRC_SERVER, env.coin.IRC_PORT) self.channel = env.coin.IRC_CHANNEL self.prefix = env.coin.IRC_PREFIX - - self.clients = [] self.nick = '{}{}'.format(self.prefix, env.irc_nick if env.irc_nick else double_sha256(env.report_host.encode()) [:5].hex()) - self.clients.append(IrcClient(irc_address, self.nick, - env.report_host, - env.report_tcp_port, - env.report_ssl_port)) - if env.report_host_tor: - self.clients.append(IrcClient(irc_address, self.nick + '_tor', - env.report_host_tor, - env.report_tcp_port_tor, - env.report_ssl_port_tor)) - self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix)) - self.peers = {} - async def start(self): + async def start(self, name_pairs): '''Start IRC connections if enabled in environment.''' - try: - if self.env.irc: - await self.join() - else: - self.logger.info('IRC is disabled') - except asyncio.CancelledError: - pass - except Exception as e: - self.logger.error(str(e)) - - async def join(self): import irc.client as irc_client from jaraco.stream import buffer @@ -77,21 +50,18 @@ class IRC(LoggedClass): # Register handlers for events we're interested in reactor = irc_client.Reactor() - for event in 'welcome join quit kick whoreply disconnect'.split(): + for event in 'welcome join quit whoreply disconnect'.split(): reactor.add_global_handler(event, getattr(self, 'on_' + event)) # Note: Multiple nicks in same channel will trigger duplicate events - for client in self.clients: - client.connection = reactor.server() + clients = [IrcClient(self.coin, real_name, self.nick + suffix, + reactor.server()) + for (real_name, suffix) in name_pairs] while True: try: - for client in self.clients: - self.logger.info('Joining IRC in {} as "{}" with ' - 'real name "{}"' - .format(self.channel, client.nick, - client.realname)) - client.connect() + for client in clients: + client.connect(self) while True: reactor.process_once() await asyncio.sleep(2) @@ -130,14 +100,7 @@ class IRC(LoggedClass): '''Called when someone leaves our channel.''' match = self.peer_regexp.match(event.source) if match: - self.peers.pop(match.group(1), None) - - def on_kick(self, connection, event): - '''Called when someone is kicked from our channel.''' - self.log_event(event) - match = self.peer_regexp.match(event.arguments[0]) - if match: - self.peers.pop(match.group(1), None) + self.peer_mgr.remove_irc_peer(match.group(1)) def on_whoreply(self, connection, event): '''Called when a response to our who requests arrives. @@ -145,50 +108,25 @@ class IRC(LoggedClass): The nick is the 4th argument, and real name is in the 6th argument preceeded by '0 ' for some reason. ''' - try: - nick = event.arguments[4] - if nick.startswith(self.prefix): - line = event.arguments[6].split() - try: - ip_addr = socket.gethostbyname(line[1]) - except socket.error: - # Could be .onion or IPv6. - ip_addr = line[1] - peer = self.Peer(ip_addr, line[1], line[2:]) - self.peers[nick] = peer - except (IndexError, UnicodeError): - # UnicodeError comes from invalid domains (issue #68) - pass + nick = event.arguments[4] + if nick.startswith(self.prefix): + line = event.arguments[6].split() + hostname, details = line[1], line[2:] + self.peer_mgr.add_irc_peer(nick, hostname, details) -class IrcClient(LoggedClass): +class IrcClient(object): - VERSION = '1.0' - DEFAULT_PORTS = {'t': 50001, 's': 50002} - - def __init__(self, irc_address, nick, host, tcp_port, ssl_port): - super().__init__() - self.irc_host, self.irc_port = irc_address + def __init__(self, coin, real_name, nick, server): + self.irc_host = coin.IRC_SERVER + self.irc_port = coin.IRC_PORT self.nick = nick - self.realname = self.create_realname(host, tcp_port, ssl_port) - self.connection = None + self.real_name = real_name + self.server = server - def connect(self, keepalive=60): + def connect(self, irc): '''Connect this client to its IRC server''' - self.connection.connect(self.irc_host, self.irc_port, self.nick, - ircname=self.realname) - self.connection.set_keepalive(keepalive) - - @classmethod - def create_realname(cls, host, tcp_port, ssl_port): - def port_text(letter, port): - if not port: - return '' - if port == cls.DEFAULT_PORTS.get(letter): - return ' ' + letter - else: - return ' ' + letter + str(port) - - tcp = port_text('t', tcp_port) - ssl = port_text('s', ssl_port) - return '{} v{}{}{}'.format(host, cls.VERSION, tcp, ssl) + irc.logger.info('joining {} as "{}" with real name "{}"' + .format(irc.channel, self.nick, self.real_name)) + self.server.connect(self.irc_host, self.irc_port, self.nick, + ircname=self.real_name) diff --git a/server/peers.py b/server/peers.py new file mode 100644 index 0000000..541c983 --- /dev/null +++ b/server/peers.py @@ -0,0 +1,139 @@ +# Copyright (c) 2017, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + +'''Peer management.''' + +import asyncio +import socket +import traceback +from collections import namedtuple +from functools import partial + +import lib.util as util +from server.irc import IRC + + +NetIdentity = namedtuple('NetIdentity', 'host tcp_port ssl_port nick_suffix') +IRCPeer = namedtuple('IRCPeer', 'ip_addr host details') + + +class PeerManager(util.LoggedClass): + '''Looks after the DB of peer network servers. + + Attempts to maintain a connection with up to 8 peers. + Issues a 'peers.subscribe' RPC to them and tells them our data. + ''' + VERSION = '1.0' + DEFAULT_PORTS = {'t': 50001, 's': 50002} + + def __init__(self, env): + super().__init__() + self.env = env + self.loop = asyncio.get_event_loop() + self.irc = IRC(env, self) + self.futures = set() + self.identities = [] + # Keyed by nick + self.irc_peers = {} + + # We can have a Tor identity inaddition to a normal one + self.identities.append(NetIdentity(env.report_host, + env.report_tcp_port, + env.report_ssl_port, + '')) + if env.report_host_tor.endswith('.onion'): + self.identities.append(NetIdentity(env.report_host_tor, + env.report_tcp_port_tor, + env.report_ssl_port_tor, + '_tor')) + + async def executor(self, func, *args, **kwargs): + '''Run func taking args in the executor.''' + await self.loop.run_in_executor(None, partial(func, *args, **kwargs)) + + @classmethod + def real_name(cls, identity): + '''Real name as used on IRC.''' + def port_text(letter, port): + if not port: + return '' + if port == cls.DEFAULT_PORTS.get(letter): + return ' ' + letter + else: + return ' ' + letter + str(port) + + tcp = port_text('t', identity.tcp_port) + ssl = port_text('s', identity.ssl_port) + return '{} v{}{}{}'.format(identity.host, cls.VERSION, tcp, ssl) + + def ensure_future(self, coro): + '''Convert a coro into a future and add it to our pending list + to be waited for.''' + self.futures.add(asyncio.ensure_future(coro)) + + def start_irc(self): + '''Start up the IRC connections if enabled.''' + if self.env.irc: + name_pairs = [(self.real_name(identity), identity.nick_suffix) + for identity in self.identities] + self.ensure_future(self.irc.start(name_pairs)) + else: + self.logger.info('IRC is disabled') + + async def main_loop(self): + '''Start and then enter the main loop.''' + self.start_irc() + + try: + while True: + await asyncio.sleep(10) + done = [future for future in self.futures if future.done()] + self.futures.difference_update(done) + for future in done: + try: + future.result() + except: + self.log_error(traceback.format_exc()) + finally: + for future in self.futures: + future.cancel() + + def dns_lookup_peer(self, nick, hostname, details): + try: + ip_addr = None + try: + ip_addr = socket.gethostbyname(hostname) + except socket.error: + pass # IPv6? + ip_addr = ip_addr or hostname + self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details) + self.logger.info('new IRC peer {} at {} ({})' + .format(nick, hostname, details)) + except UnicodeError: + # UnicodeError comes from invalid domains (issue #68) + self.logger.info('IRC peer domain {} invalid'.format(hostname)) + + def add_irc_peer(self, *args): + '''Schedule DNS lookup of peer.''' + self.ensure_future(self.executor(self.dns_lookup_peer, *args)) + + def remove_irc_peer(self, nick): + '''Remove a peer from our IRC peers map.''' + self.logger.info('removing IRC peer {}'.format(nick)) + self.irc_peers.pop(nick, None) + + def count(self): + return len(self.irc_peers) + + def peer_list(self): + return self.irc_peers + + async def subscribe(self): + '''Returns the server peers as a list of (ip, host, details) tuples. + + Despite the name this is not currently treated as a subscription.''' + return list(self.irc_peers.values()) From 5343c1a2868023dcdef31e20947be8809e66f683 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Krac=C3=ADk?= Date: Sun, 22 Jan 2017 13:42:52 +0100 Subject: [PATCH 09/12] Changed litecoin genesis block hash (#109) --- lib/coins.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/coins.py b/lib/coins.py index 47f1f3f..ee63384 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -318,8 +318,8 @@ class Litecoin(Coin): P2PKH_VERBYTE = 0x30 P2SH_VERBYTE = 0x05 WIF_BYTE = 0xb0 - GENESIS_HASH=('000000000019d6689c085ae165831e93' - '4ff763ae46a2a6c172b3f1b60a8ce26f') + GENESIS_HASH=('12a765e31ffd4059bada1e25190f6e98' + 'c99d9714d334efa41a195a7e7e04bfe2') TX_COUNT = 8908766 TX_COUNT_HEIGHT = 1105256 TX_PER_BLOCK = 10 From d0d4c0d75f0ade6e9358cd80d16c4501860d92e4 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 23 Jan 2017 07:28:13 +0900 Subject: [PATCH 10/12] Prepare 0.10.10 --- README.rst | 8 ++++++++ server/version.py | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index d9a5c4a..b139045 100644 --- a/README.rst +++ b/README.rst @@ -137,6 +137,14 @@ version prior to the release of 1.0. ChangeLog ========= +Version 0.10.10 +--------------- + +* move peer management from irc.py to peers.py. This is preparataion + for peer discovery without IRC. +* misc cleanups +* fix Litecoin genesis hash (petrkr) + Version 0.10.9 -------------- diff --git a/server/version.py b/server/version.py index 22f8ce3..4a16a19 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.10.9" +VERSION = "ElectrumX 0.10.10" From a7462a6ea3997ef0404df9212397b571af8b523e Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Fri, 20 Jan 2017 19:00:28 +0900 Subject: [PATCH 11/12] Complete rewrite of JSON RPC infrastructure Aim to be easier for clients to use, because to do peer discovery we must act as a client to other servers. Split out JSON session concept from the asyncio protocol concept. This makes the JSON RPC support more easily testable and usable as a stand-alone library. In addition, support JSON RPC v1 and v2, and auto-detection of peer's version. --- docs/ARCHITECTURE.rst | 2 +- docs/ENVIRONMENT.rst | 3 +- electrumx_rpc.py | 93 +++-- lib/jsonrpc.py | 887 ++++++++++++++++++++++++++---------------- server/controller.py | 113 +++--- server/peers.py | 2 +- server/session.py | 133 +++---- 7 files changed, 709 insertions(+), 524 deletions(-) diff --git a/docs/ARCHITECTURE.rst b/docs/ARCHITECTURE.rst index ed1c0e0..b796d8c 100644 --- a/docs/ARCHITECTURE.rst +++ b/docs/ARCHITECTURE.rst @@ -36,7 +36,7 @@ Not started until the Block Processor has caught up with bitcoind. Daemon ------ -Encapsulates the RPC wire protcol with bitcoind for the whole server. +Encapsulates the RPC wire protocol with bitcoind for the whole server. Transparently handles temporary bitcoind connection errors, and fails over if necessary. diff --git a/docs/ENVIRONMENT.rst b/docs/ENVIRONMENT.rst index f323356..41f4284 100644 --- a/docs/ENVIRONMENT.rst +++ b/docs/ENVIRONMENT.rst @@ -205,7 +205,8 @@ below are low and encourage you to raise them. An integer number of seconds defaulting to 600. Sessions with no activity for longer than this are disconnected. Properly functioning Electrum clients by default will send pings roughly - every 60 seconds. + every 60 seconds, and servers doing peer discovery roughly every 300 + seconds. IRC --- diff --git a/electrumx_rpc.py b/electrumx_rpc.py index deecd1e..122a7df 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -16,44 +16,60 @@ import json from functools import partial from os import environ -from lib.jsonrpc import JSONRPC +from lib.jsonrpc import JSONSession, JSONRPCv2 from server.controller import Controller -class RPCClient(JSONRPC): +class RPCClient(JSONSession): def __init__(self): - super().__init__() - self.queue = asyncio.Queue() - self.max_send = 1000000 + super().__init__(version=JSONRPCv2) + self.max_send = 0 + self.max_buffer_size = 5*10**6 + self.event = asyncio.Event() - def enqueue_request(self, request): - self.queue.put_nowait(request) + def have_pending_items(self): + self.event.set() - async def send_and_wait(self, method, params, timeout=None): - # Raise incoming buffer size - presumably connection is trusted - self.max_buffer_size = 5000000 - if params: - params = [params] - self.send_request(method, method, params) + async def wait_for_response(self): + await self.event.wait() + await self.process_pending_items() - future = asyncio.ensure_future(self.queue.get()) - for f in asyncio.as_completed([future], timeout=timeout): - try: - request = await f - except asyncio.TimeoutError: - future.cancel() - print('request timed out after {}s'.format(timeout)) + def send_rpc_request(self, method, params): + handler = partial(self.handle_response, method) + self.send_request(handler, method, params) + + def handle_response(self, method, result, error): + if method in ('groups', 'sessions') and not error: + if method == 'groups': + lines = Controller.groups_text_lines(result) else: - await request.process(self) - - async def handle_response(self, result, error, method): - if result and method in ('groups', 'sessions'): - for line in Controller.text_lines(method, result): + lines = Controller.sessions_text_lines(result) + for line in lines: print(line) + elif error: + print('error: {} (code {:d})' + .format(error['message'], error['code'])) else: - value = {'error': error} if error else result - print(json.dumps(value, indent=4, sort_keys=True)) + print(json.dumps(result, indent=4, sort_keys=True)) + + +def rpc_send_and_wait(port, method, params, timeout=15): + loop = asyncio.get_event_loop() + coro = loop.create_connection(RPCClient, 'localhost', port) + try: + transport, rpc_client = loop.run_until_complete(coro) + rpc_client.send_rpc_request(method, params) + try: + coro = rpc_client.wait_for_response() + loop.run_until_complete(asyncio.wait_for(coro, timeout)) + except asyncio.TimeoutError: + print('request timed out after {}s'.format(timeout)) + except OSError: + print('cannot connect - is ElectrumX catching up, not running, or ' + 'is {:d} the wrong RPC port?'.format(port)) + finally: + loop.close() def main(): @@ -67,20 +83,17 @@ def main(): help='params to send') args = parser.parse_args() - if args.port is None: - args.port = int(environ.get('RPC_PORT', 8000)) + port = args.port + if port is None: + port = int(environ.get('RPC_PORT', 8000)) - loop = asyncio.get_event_loop() - coro = loop.create_connection(RPCClient, 'localhost', args.port) - try: - transport, protocol = loop.run_until_complete(coro) - coro = protocol.send_and_wait(args.command[0], args.param, timeout=15) - loop.run_until_complete(coro) - except OSError: - print('error connecting - is ElectrumX catching up or not running?') - finally: - loop.stop() - loop.close() + # Get the RPC request. + method = args.command[0] + params = args.param + if method in ('log', 'disconnect'): + params = [params] + + rpc_send_and_wait(port, method, params) if __name__ == '__main__': diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 7657cfb..b46992c 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -1,21 +1,26 @@ -# Copyright (c) 2016, Neil Booth +# Copyright (c) 2016-2017, Neil Booth # # All rights reserved. # # See the file "LICENCE" for information about the copyright # and warranty status of this software. -'''Class for handling JSON RPC 2.0 connections, server or client.''' +'''Classes for acting as a peer over a transport and speaking the JSON +RPC versions 1.0 and 2.0. + +JSONSessionBase can use an arbitrary transport. +JSONSession integrates asyncio.Protocol to provide the transport. +''' import asyncio +import collections import inspect import json import numbers import time import traceback -from functools import partial -from lib.util import LoggedClass +import lib.util as util class RPCError(Exception): @@ -26,90 +31,8 @@ class RPCError(Exception): self.code = code -class RequestBase(object): - '''An object that represents a queued request.''' - - def __init__(self, remaining): - self.remaining = remaining - - -class SingleRequest(RequestBase): - '''An object that represents a single request.''' - - def __init__(self, payload): - super().__init__(1) - self.payload = payload - - async def process(self, session): - '''Asynchronously handle the JSON request.''' - self.remaining = 0 - binary = await session.process_single_payload(self.payload) - if binary: - session._send_bytes(binary) - - def __str__(self): - return str(self.payload) - - -class BatchRequest(RequestBase): - '''An object that represents a batch request and its processing state. - - Batches are processed in chunks. - ''' - - def __init__(self, payload): - super().__init__(len(payload)) - self.payload = payload - self.parts = [] - - async def process(self, session): - '''Asynchronously handle the JSON batch according to the JSON 2.0 - spec.''' - target = max(self.remaining - 4, 0) - while self.remaining > target: - item = self.payload[len(self.payload) - self.remaining] - self.remaining -= 1 - part = await session.process_single_payload(item) - if part: - self.parts.append(part) - - total_len = sum(len(part) + 2 for part in self.parts) - if session.is_oversized_request(total_len): - raise RPCError('request too large', JSONRPC.INVALID_REQUEST) - - if not self.remaining: - if self.parts: - binary = b'[' + b', '.join(self.parts) + b']' - session._send_bytes(binary) - - def __str__(self): - return str(self.payload) - - -class JSONRPC(asyncio.Protocol, LoggedClass): - '''Manages a JSONRPC connection. - - Assumes JSON messages are newline-separated and that newlines - cannot appear in the JSON other than to separate lines. Incoming - requests are passed to enqueue_request(), which should arrange for - their asynchronous handling via the request's process() method. - - Derived classes may want to override connection_made() and - connection_lost() but should be sure to call the implementation in - this base class first. They may also want to implement the asynchronous - function handle_response() which by default does nothing. - - The functions request_handler() and notification_handler() are - passed an RPC method name, and should return an asynchronous - function to call to handle it. The functions' docstrings are used - for help, and the arguments are what can be used as JSONRPC 2.0 - named arguments (and thus become part of the external interface). - If the method is unknown return None. - - Request handlers should return a Python object to return to the - caller, or raise an RPCError on error. Notification handlers - should not return a value or raise any exceptions. - ''' +class JSONRPC(object): + '''Base class of JSON RPC versions.''' # See http://www.jsonrpc.org/specification PARSE_ERROR = -32700 @@ -119,26 +42,257 @@ class JSONRPC(asyncio.Protocol, LoggedClass): INTERNAL_ERROR = -32603 ID_TYPES = (type(None), str, numbers.Number) - NEXT_SESSION_ID = 0 + HAS_BATCHES = False + + +class JSONRPCv1(JSONRPC): + '''JSON RPC version 1.0.''' @classmethod def request_payload(cls, id_, method, params=None): - payload = {'jsonrpc': '2.0', 'id': id_, 'method': method} + '''JSON v1 request payload. Params is mandatory.''' + return {'method': method, 'params': params or [], 'id': id_} + + @classmethod + def notification_payload(cls, method, params=None): + '''JSON v1 notification payload. Params and id are mandatory.''' + return {'method': method, 'params': params or [], 'id': None} + + @classmethod + def response_payload(cls, result, id_): + '''JSON v1 response payload. error is present and None.''' + return {'id': id_, 'result': result, 'error': None} + + @classmethod + def error_payload(cls, message, code, id_): + '''JSON v1 error payload. result is present and None.''' + return {'id': id_, 'result': None, + 'error': {'message': message, 'code': code}} + + @classmethod + def handle_response(cls, handler, payload): + '''JSON v1 response handler. Both 'error' and 'response' + should exist with exactly one being None. + ''' + try: + result = payload['result'] + error = payload['error'] + except KeyError: + pass + else: + if (result is None) != (error is None): + handler(result, error) + + @classmethod + def is_request(cls, payload): + '''Returns True if the payload (which has a method) is a request. + False means it is a notification.''' + return payload.get('id') != None + + +class JSONRPCv2(JSONRPC): + '''JSON RPC version 2.0.''' + + HAS_BATCHES = True + + @classmethod + def request_payload(cls, id_, method, params=None): + '''JSON v2 request payload. Params is optional.''' + payload = {'jsonrpc': '2.0', 'method': method, 'id': id_} + if params: + payload['params'] = params + return payload + + @classmethod + def notification_payload(cls, method, params=None): + '''JSON v2 notification payload. There must be no id.''' + payload = {'jsonrpc': '2.0', 'method': method} if params: payload['params'] = params return payload @classmethod def response_payload(cls, result, id_): - return {'jsonrpc': '2.0', 'result': result, 'id': id_} + '''JSON v2 response payload. error is not present.''' + return {'jsonrpc': '2.0', 'id': id_, 'result': result} @classmethod - def error_payload(cls, message, code, id_=None): - error = {'message': message, 'code': code} - return {'jsonrpc': '2.0', 'error': error, 'id': id_} + def error_payload(cls, message, code, id_): + '''JSON v2 error payload. result is not present.''' + return {'jsonrpc': '2.0', 'id': id_, + 'error': {'message': message, 'code': code}} @classmethod - def check_payload_id(cls, payload): + def handle_response(cls, handler, payload): + '''JSON v2 response handler. Exactly one of 'error' and 'response' + must exist. Errors must have 'code' and 'message' members. + ''' + if ('error' in payload) != ('result' in payload): + if 'result' in payload: + handler(payload['result'], None) + else: + error = payload['error'] + if (isinstance(error, dict) and 'code' in error + and 'message' in error): + handler(None, error) + + @classmethod + def batch_size(cls, parts): + '''Return the size of a JSON batch from its parts.''' + return sum(len(part) for part in parts) + 2 * len(parts) + + @classmethod + def batch_bytes(cls, parts): + '''Return the bytes of a JSON batch from its parts.''' + if parts: + return b'[' + b', '.join(parts) + b']' + return b'' + + @classmethod + def is_request(cls, payload): + '''Returns True if the payload (which has a method) is a request. + False means it is a notification.''' + return 'id' in payload + + +class JSONRPCCompat(JSONRPC): + '''Intended to be used until receiving a response from the peer, at + which point detect_version should be used to choose which version + to use. + + Sends requests compatible with v1 and v2. Errors cannot be + compatible so v2 errors are sent. + + Does not send responses or notifications, nor handle responses. + + ''' + @classmethod + def request_payload(cls, id_, method, params=None): + '''JSON v2 request payload but with params present.''' + return {'jsonrpc': '2.0', 'id': id_, + 'method': method, 'params': params or []} + + @classmethod + def error_payload(cls, message, code, id_): + '''JSON v2 error payload. result is not present.''' + return {'jsonrpc': '2.0', 'id': id_, + 'error': {'message': message, 'code': code}} + + @classmethod + def detect_version(cls, payload): + '''Return a best guess at a version compatible with the received + payload. + + Return None if one cannot be determined. + ''' + def item_version(item): + if isinstance(item, dict): + version = item.get('jsonrpc') + if version is None: + return JSONRPCv1 + if version == '2.0': + return JSONRPCv2 + return None + + if isinstance(payload, list) and payload: + version = item_version(payload[0]) + # If a batch return at least JSONRPCv2 + if version in (JSONRPCv1, None): + version = JSONRPCv2 + else: + version = item_version(payload) + + return version + + +class JSONSessionBase(util.LoggedClass): + '''Acts as the application layer session, communicating via JSON RPC + over an underlying transport. + + Processes incoming and sends outgoing requests, notifications and + responses. Incoming messages are queued. When the queue goes + from empty + ''' + + NEXT_SESSION_ID = 0 + + @classmethod + def next_session_id(cls): + session_id = cls.NEXT_SESSION_ID + cls.NEXT_SESSION_ID += 1 + return session_id + + def __init__(self, version=JSONRPCCompat): + super().__init__() + + # Parts of an incomplete JSON line. We buffer them until + # getting a newline. + self.parts = [] + self.version = version + self.log_me = False + self.session_id = None + # Count of incoming complete JSON requests and the time of the + # last one. A batch counts as just one here. + self.last_recv = time.time() + self.send_count = 0 + self.send_size = 0 + self.recv_size = 0 + self.recv_count = 0 + self.error_count = 0 + self.pause = False + # Handling of incoming items + self.items = collections.deque() + self.batch_results = [] + # Handling of outgoing requests + self.next_request_id = 0 + self.pending_responses = {} + # If buffered incoming data exceeds this the connection is closed + self.max_buffer_size = 1000000 + self.max_send = 50000 + self.close_after_send = False + + def pause_writing(self): + '''Transport calls when the send buffer is full.''' + self.log_info('pausing processing whilst socket drains') + self.pause = True + + def resume_writing(self): + '''Transport calls when the send buffer has room.''' + self.log_info('resuming processing') + self.pause = False + + def is_oversized(self, length): + '''Return True if the given outgoing message size is too large.''' + if self.max_send and length > max(1000, self.max_send): + msg = 'response too large (at least {:d} bytes)'.format(length) + return self.error_bytes(msg, JSONRPC.INVALID_REQUEST, + payload.get('id')) + return False + + def send_binary(self, binary): + '''Pass the bytes through to the transport. + + Close the connection if close_after_send is set. + ''' + if self.is_closing(): + return + self.using_bandwidth(len(binary)) + self.send_count += 1 + self.send_size += len(binary) + self.send_bytes(binary) + if self.close_after_send: + self.close_connection() + + def payload_id(self, payload): + '''Extract and return the ID from the payload. + + Returns None if it is missing or invalid.''' + try: + return self.check_payload_id(payload) + except RPCError: + return None + + def check_payload_id(self, payload): '''Extract and return the ID from the payload. Raises an RPCError if it is missing or invalid.''' @@ -146,187 +300,41 @@ class JSONRPC(asyncio.Protocol, LoggedClass): raise RPCError('missing id', JSONRPC.INVALID_REQUEST) id_ = payload['id'] - if not isinstance(id_, JSONRPC.ID_TYPES): - raise RPCError('invalid id: {}'.format(id_), + if not isinstance(id_, self.version.ID_TYPES): + raise RPCError('invalid id type {}'.format(type(id_)), JSONRPC.INVALID_REQUEST) return id_ - @classmethod - def payload_id(cls, payload): - '''Extract and return the ID from the payload. + def request_bytes(self, id_, method, params=None): + '''Return the bytes of a JSON request.''' + payload = self.version.request_payload(id_, method, params) + return self.encode_payload(payload) - Returns None if it is missing or invalid.''' - try: - return cls.check_payload_id(payload) - except RPCError: - return None + def notification_bytes(self, method, params=None): + payload = self.version.notification_payload(method, params) + return self.encode_payload(payload) - def __init__(self): - super().__init__() - self.send_notification = partial(self.send_request, None) - self.start = time.time() - self.stop = 0 - self.last_recv = self.start - self.bandwidth_start = self.start - self.bandwidth_interval = 3600 - self.bandwidth_used = 0 - self.bandwidth_limit = 5000000 - self.transport = None - self.pause = False - # Parts of an incomplete JSON line. We buffer them until - # getting a newline. - self.parts = [] - # recv_count is JSON messages not calls to data_received() - self.recv_count = 0 - self.recv_size = 0 - self.send_count = 0 - self.send_size = 0 - self.error_count = 0 - self.close_after_send = False - self.peer_info = None - # Sends longer than max_send are prevented, instead returning - # an oversized request error to other end of the network - # connection. The request causing it is logged. Values under - # 1000 are treated as 1000. - self.max_send = 0 - # If buffered incoming data exceeds this the connection is closed - self.max_buffer_size = 1000000 - self.anon_logs = False - self.id_ = JSONRPC.NEXT_SESSION_ID - JSONRPC.NEXT_SESSION_ID += 1 - self.log_prefix = '[{:d}] '.format(self.id_) - self.log_me = False + def response_bytes(self, result, id_): + '''Return the bytes of a JSON response.''' + return self.encode_payload(self.version.response_payload(result, id_)) - def peername(self, *, for_log=True): - '''Return the peer name of this connection.''' - if not self.peer_info: - return 'unknown' - if for_log and self.anon_logs: - return 'xx.xx.xx.xx:xx' - if ':' in self.peer_info[0]: - return '[{}]:{}'.format(self.peer_info[0], self.peer_info[1]) - else: - return '{}:{}'.format(self.peer_info[0], self.peer_info[1]) + def error_bytes(self, message, code, id_=None): + '''Return the bytes of a JSON error. - def connection_made(self, transport): - '''Handle an incoming client connection.''' - self.transport = transport - self.peer_info = transport.get_extra_info('peername') - transport.set_write_buffer_limits(high=500000) - - def connection_lost(self, exc): - '''Handle client disconnection.''' - pass - - def pause_writing(self): - '''Called by asyncio when the write buffer is full.''' - self.log_info('pausing request processing whilst socket drains') - self.pause = True - - def resume_writing(self): - '''Called by asyncio when the write buffer has room.''' - self.log_info('resuming request processing') - self.pause = False - - def close_connection(self): - self.stop = time.time() - if self.transport: - self.transport.close() - - def using_bandwidth(self, amount): - now = time.time() - # Reduce the recorded usage in proportion to the elapsed time - elapsed = now - self.bandwidth_start - self.bandwidth_start = now - refund = int(elapsed / self.bandwidth_interval * self.bandwidth_limit) - refund = min(refund, self.bandwidth_used) - self.bandwidth_used += amount - refund - self.throttled = max(0, self.throttled - int(elapsed) // 60) - - def data_received(self, data): - '''Handle incoming data (synchronously). - - Requests end in newline characters. Pass complete requests to - decode_message for handling. - ''' - self.recv_size += len(data) - self.using_bandwidth(len(data)) - - # Close abusive connections where buffered data exceeds limit - buffer_size = len(data) + sum(len(part) for part in self.parts) - if buffer_size > self.max_buffer_size: - self.log_error('read buffer of {:,d} bytes exceeds {:,d} ' - 'byte limit, closing {}' - .format(buffer_size, self.max_buffer_size, - self.peername())) - self.close_connection() - - # Do nothing if this connection is closing - if self.transport.is_closing(): - return - - while True: - npos = data.find(ord('\n')) - if npos == -1: - self.parts.append(data) - break - self.last_recv = time.time() - self.recv_count += 1 - tail, data = data[:npos], data[npos + 1:] - parts, self.parts = self.parts, [] - parts.append(tail) - self.decode_message(b''.join(parts)) - - def decode_message(self, message): - '''Decode a binary message and queue it for asynchronous handling. - - Messages that cannot be decoded are logged and dropped. - ''' - try: - message = message.decode() - except UnicodeDecodeError as e: - msg = 'cannot decode binary bytes: {}'.format(e) - self.send_json_error(msg, JSONRPC.PARSE_ERROR) - return - - try: - message = json.loads(message) - except json.JSONDecodeError as e: - msg = 'cannot decode JSON: {}'.format(e) - self.send_json_error(msg, JSONRPC.PARSE_ERROR) - return - - if isinstance(message, list): - # Batches must have at least one object. - if not message: - self.send_json_error('empty batch', JSONRPC.INVALID_REQUEST) - return - request = BatchRequest(message) - else: - request = SingleRequest(message) - - '''Queue the request for asynchronous handling.''' - self.enqueue_request(request) - if self.log_me: - self.log_info('queued {}'.format(message)) - - def send_json_error(self, message, code, id_=None): - '''Send a JSON error.''' - self._send_bytes(self.json_error_bytes(message, code, id_)) - - def send_request(self, id_, method, params=None): - '''Send a request. If id_ is None it is a notification.''' - self.encode_and_send_payload(self.request_payload(id_, method, params)) - - def send_notifications(self, mp_iterable): - '''Send an iterable of (method, params) notification pairs. - - A 1-tuple is also valid in which case there are no params.''' - # TODO: maybe send batches if remote side supports it - for pair in mp_iterable: - self.send_notification(*pair) + Flag the connection to close on a fatal error or too many errors.''' + version = self.version + self.error_count += 1 + if code in (version.PARSE_ERROR, version.INVALID_REQUEST): + self.log_info(message) + self.close_after_send = True + elif self.error_count >= 10: + self.log_info('too many errors, last: {}'.format(message)) + self.close_after_send = True + return self.encode_payload(self.version.error_payload + (message, code, id_)) def encode_payload(self, payload): + '''Encode a Python object as binary bytes.''' assert isinstance(payload, dict) try: @@ -334,86 +342,118 @@ class JSONRPC(asyncio.Protocol, LoggedClass): except TypeError: msg = 'JSON encoding failure: {}'.format(payload) self.log_error(msg) - binary = self.json_error_bytes(msg, JSONRPC.INTERNAL_ERROR, - payload.get('id')) + binary = self.error_bytes(msg, JSONRPC.INTERNAL_ERROR, + payload.get('id')) - if self.is_oversized_request(len(binary)): - binary = self.json_error_bytes('request too large', - JSONRPC.INVALID_REQUEST, - payload.get('id')) - self.send_count += 1 - self.send_size += len(binary) - self.using_bandwidth(len(binary)) - return binary + error_bytes = self.is_oversized(len(binary)) + return error_bytes or binary - def is_oversized_request(self, total_len): - return total_len > max(1000, self.max_send) + def decode_message(self, payload): + '''Decode a binary message and pass it on to process_single_item or + process_batch as appropriate. - def _send_bytes(self, binary): - '''Send JSON text over the transport. Close it if close is True.''' - # Confirmed this happens, sometimes a lot - if self.transport.is_closing(): + Messages that cannot be decoded are logged and dropped. + ''' + try: + payload = payload.decode() + except UnicodeDecodeError as e: + msg = 'cannot decode message: {}'.format(e) + self.send_error(msg, JSONRPC.PARSE_ERROR) return - self.transport.write(binary) - self.transport.write(b'\n') - if self.close_after_send: - self.close_connection() - def encode_and_send_payload(self, payload): - '''Encode the payload and send it.''' - self._send_bytes(self.encode_payload(payload)) + try: + payload = json.loads(payload) + except json.JSONDecodeError as e: + msg = 'cannot decode JSON: {}'.format(e) + self.send_error(msg, JSONRPC.PARSE_ERROR) + return - def json_request_bytes(self, method, id_, params=None): - '''Return the bytes of a JSON request.''' - return self.encode_payload(self.request_payload(method, id_, params)) + if self.version is JSONRPCCompat: + # Attempt to detect peer's JSON RPC version + version = self.version.detect_version(payload) + if not version: + version = JSONRPCv2 + self.log_info('unable to detect JSON RPC version, using 2.0') + self.version = version - def json_response_bytes(self, result, id_): - '''Return the bytes of a JSON response.''' - return self.encode_payload(self.response_payload(result, id_)) + # Batches must have at least one object. + if payload == [] and self.version.HAS_BATCHES: + self.send_error('empty batch', JSONRPC.INVALID_REQUEST) + return - def json_error_bytes(self, message, code, id_=None): - '''Return the bytes of a JSON error. + # Incoming items get queued for later asynchronous processing. + if not self.items: + self.have_pending_items() + self.items.append(payload) - Flag the connection to close on a fatal error or too many errors.''' - self.error_count += 1 - if (code in (JSONRPC.PARSE_ERROR, JSONRPC.INVALID_REQUEST) - or self.error_count > 10): - self.close_after_send = True - return self.encode_payload(self.error_payload(message, code, id_)) + async def process_batch(self, batch, count): + '''Processes count items from the batch according to the JSON 2.0 + spec. - async def process_single_payload(self, payload): + If any remain, puts what is left of the batch back in the deque + and returns None. Otherwise returns the binary batch result.''' + results = self.batch_results + self.batch_results = [] + + for n in range(count): + item = batch.pop() + result = await self.process_single_item(item) + if result: + results.append(result) + + if not batch: + return self.version.batch_bytes(results) + + error_bytes = self.is_oversized(self.batch_size(results)) + if error_bytes: + return error_bytes + + self.items.appendleft(item) + self.batch_results = results + return None + + async def process_single_item(self, payload): '''Handle a single JSON request, notification or response. If it is a request, return the binary response, oterhwise None.''' + if self.log_me: + self.log_info('processing {}'.format(payload)) + if not isinstance(payload, dict): - return self.json_error_bytes('request must be a dict', - JSONRPC.INVALID_REQUEST) + return self.error_bytes('request must be a dictionary', + JSONRPC.INVALID_REQUEST) - # Requests and notifications must have a method. - # Notifications are distinguished by having no 'id'. - if 'method' in payload: - if 'id' in payload: - return await self.process_single_request(payload) + try: + # Requests and notifications must have a method. + if 'method' in payload: + if self.version.is_request(payload): + return await self.process_single_request(payload) + else: + await self.process_single_notification(payload) else: - await self.process_single_notification(payload) - else: - await self.process_single_response(payload) + self.process_single_response(payload) - return None + return None + except asyncio.CancelledError: + raise + except Exception: + self.log_error(traceback.format_exc()) + return self.error_bytes('internal error processing request', + JSONRPC.INTERNAL_ERROR, + self.payload_id(payload)) async def process_single_request(self, payload): '''Handle a single JSON request and return the binary response.''' try: result = await self.handle_payload(payload, self.request_handler) - return self.json_response_bytes(result, payload['id']) + return self.response_bytes(result, payload['id']) except RPCError as e: - return self.json_error_bytes(e.msg, e.code, - self.payload_id(payload)) + return self.error_bytes(e.msg, e.code, self.payload_id(payload)) except Exception: self.log_error(traceback.format_exc()) - return self.json_error_bytes('internal error processing request', - JSONRPC.INTERNAL_ERROR, - self.payload_id(payload)) + return self.error_bytes('internal error processing request', + JSONRPC.INTERNAL_ERROR, + self.payload_id(payload)) async def process_single_notification(self, payload): '''Handle a single JSON notification.''' @@ -424,18 +464,16 @@ class JSONRPC(asyncio.Protocol, LoggedClass): except Exception: self.log_error(traceback.format_exc()) - async def process_single_response(self, payload): + def process_single_response(self, payload): '''Handle a single JSON response.''' try: id_ = self.check_payload_id(payload) - # Only one of result and error should exist - if 'error' in payload: - error = payload['error'] - if (not 'result' in payload and isinstance(error, dict) - and 'code' in error and 'message' in error): - await self.handle_response(None, error, id_) - elif 'result' in payload: - await self.handle_response(payload['result'], None, id_) + handler = self.pending_responses.pop(id_, None) + if handler: + self.version.handle_response(handler, payload) + else: + self.log_info('response for unsent id {}'.format(id_), + throttle=True) except RPCError: pass except Exception: @@ -448,7 +486,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): method = payload.get('method') if not isinstance(method, str): - raise RPCError("invalid method: '{}'".format(method), + raise RPCError("invalid method type {}".format(type(method)), JSONRPC.INVALID_REQUEST) handler = get_handler(method) @@ -457,7 +495,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): JSONRPC.METHOD_NOT_FOUND) if not isinstance(args, (list, dict)): - raise RPCError('arguments should be an array or a dict', + raise RPCError('arguments should be an array or dictionary', JSONRPC.INVALID_REQUEST) params = inspect.signature(handler).parameters @@ -465,12 +503,13 @@ class JSONRPC(asyncio.Protocol, LoggedClass): min_args = sum(p.default is p.empty for p in params.values()) if len(args) < min_args: - raise RPCError('too few arguments: expected {:d} got {:d}' - .format(min_args, len(args)), JSONRPC.INVALID_ARGS) + raise RPCError('too few arguments to {}: expected {:d} got {:d}' + .format(method, min_args, len(args)), + JSONRPC.INVALID_ARGS) if len(args) > len(params): - raise RPCError('too many arguments: expected {:d} got {:d}' - .format(len(params), len(args)), + raise RPCError('too many arguments to {}: expected {:d} got {:d}' + .format(method, len(params), len(args)), JSONRPC.INVALID_ARGS) if isinstance(args, list): @@ -483,23 +522,179 @@ class JSONRPC(asyncio.Protocol, LoggedClass): raise RPCError('invalid parameter names: {}' .format(', '.join(bad_names))) - return await handler(**kw_args) + if inspect.iscoroutinefunction(handler): + return await handler(**kw_args) + else: + return handler(**kw_args) - # --- derived classes are intended to override these functions - def enqueue_request(self, request): - '''Enqueue a request for later asynchronous processing.''' + # ---- External Interface ---- + + async def process_pending_items(self, limit=8): + '''Processes up to LIMIT pending items asynchronously.''' + while limit > 0 and self.items: + item = self.items.popleft() + if isinstance(item, list) and self.version.HAS_BATCHES: + count = min(limit, len(item)) + binary = await self.process_batch(item, count) + limit -= count + else: + binary = await self.process_single_item(item) + limit -= 1 + + if binary: + self.send_binary(binary) + + def count_pending_items(self): + '''Counts the number of pending items.''' + return sum(len(item) if isinstance(item, list) else 1 + for item in self.items) + + def connection_made(self): + '''Call when an incoming client connection is established.''' + self.session_id = self.next_session_id() + self.log_prefix = '[{:d}] '.format(self.session_id) + + def data_received(self, data): + '''Underlying transport calls this when new data comes in. + + Look for newline separators terminating full requests. + ''' + if self.is_closing(): + return + self.using_bandwidth(len(data)) + self.recv_size += len(data) + + # Close abusive connections where buffered data exceeds limit + buffer_size = len(data) + sum(len(part) for part in self.parts) + if buffer_size > self.max_buffer_size: + self.log_error('read buffer of {:,d} bytes over {:,d} byte limit' + .format(buffer_size, self.max_buffer_size)) + self.close_connection() + return + + while True: + npos = data.find(ord('\n')) + if npos == -1: + self.parts.append(data) + break + tail, data = data[:npos], data[npos + 1:] + parts, self.parts = self.parts, [] + parts.append(tail) + self.recv_count += 1 + self.last_recv = time.time() + self.decode_message(b''.join(parts)) + + def send_error(self, message, code, id_=None): + '''Send a JSON error.''' + self.send_binary(self.error_bytes(message, code, id_)) + + def send_request(self, handler, method, params=None): + '''Sends a request and arranges for handler to be called with the + response when it comes in. + ''' + id_ = self.next_request_id + self.next_request_id += 1 + self.send_binary(self.request_bytes(id_, method, params)) + self.pending_responses[id_] = handler + + def send_notification(self, method, params=None): + '''Send a notification.''' + self.send_binary(self.notification_bytes(method, params)) + + def send_notifications(self, mp_iterable): + '''Send an iterable of (method, params) notification pairs. + + A 1-tuple is also valid in which case there are no params.''' + if self.version.HAS_BATCHES: + parts = [self.notification_bytes(*pair) for pair in mp_iterable] + self.send_binary(batch_bytes(parts)) + else: + for pair in mp_iterable: + self.send_notification(*pair) + + # -- derived classes are intended to override these functions + + # Transport layer + + def is_closing(self): + '''Return True if the underlying transport is closing.''' raise NotImplementedError - async def handle_response(self, result, error, id_): - '''Handle a JSON response. + def close_connection(self): + '''Close the connection.''' + raise NotImplementedError - Should not raise an exception. Return values are ignored. + def send_bytes(self, binary): + '''Pass the bytes through to the underlying transport.''' + raise NotImplementedError + + # App layer + + def have_pending_items(self): + '''Called to indicate there are items pending to be processed + asynchronously by calling process_pending_items. + + This is *not* called every time an item is added, just when + there were previously none and now there is at least one. ''' + raise NotImplementedError + + def using_bandwidth(self, amount): + '''Called as bandwidth is consumed. + + Override to implement bandwidth management. + ''' + pass def notification_handler(self, method): - '''Return the async handler for the given notification method.''' + '''Return the handler for the given notification. + + The handler can be synchronous or asynchronous.''' return None def request_handler(self, method): - '''Return the async handler for the given request method.''' + '''Return the handler for the given request method. + + The handler can be synchronous or asynchronous.''' return None + + +class JSONSession(JSONSessionBase, asyncio.Protocol): + '''A JSONSessionBase instance specialized for use with + asyncio.protocol to implement the transport layer. + + Derived classes must provide have_pending_items() and may want to + override the request and notification handlers. + ''' + + def __init__(self, version=JSONRPCCompat): + super().__init__(version=version) + self.transport = None + self.write_buffer_high = 500000 + + def peer_info(self): + '''Returns information about the peer.''' + return self.transport.get_extra_info('peername') + + def abort(self): + '''Cut the connection abruptly.''' + self.transport.abort() + + def connection_made(self, transport): + '''Handle an incoming client connection.''' + transport.set_write_buffer_limits(high=self.write_buffer_high) + self.transport = transport + super().connection_made() + + def is_closing(self): + '''True if the underlying transport is closing.''' + return self.transport and self.transport.is_closing() + + def close_connection(self): + '''Close the connection.''' + if self.transport: + self.transport.close() + + def send_bytes(self, binary): + '''Send JSON text over the transport.''' + self.transport.writelines((binary, b'\n')) diff --git a/server/controller.py b/server/controller.py index f2a8a6d..0372862 100644 --- a/server/controller.py +++ b/server/controller.py @@ -18,14 +18,14 @@ from functools import partial import pylru -from lib.jsonrpc import JSONRPC, RPCError, RequestBase +from lib.jsonrpc import JSONRPC, RPCError from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash import lib.util as util from server.block_processor import BlockProcessor from server.daemon import Daemon, DaemonError -from server.session import LocalRPC, ElectrumX -from server.peers import PeerManager from server.mempool import MemPool +from server.peers import PeerManager +from server.session import LocalRPC, ElectrumX from server.version import VERSION @@ -39,16 +39,6 @@ class Controller(util.LoggedClass): BANDS = 5 CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) - class NotificationRequest(RequestBase): - def __init__(self, height, touched): - super().__init__(1) - self.height = height - self.touched = touched - - async def process(self, session): - self.remaining = 0 - await session.notify(self.height, self.touched) - def __init__(self, env): super().__init__() # Set this event to cleanly shutdown @@ -56,7 +46,7 @@ class Controller(util.LoggedClass): self.loop = asyncio.get_event_loop() self.executor = ThreadPoolExecutor() self.loop.set_default_executor(self.executor) - self.start = time.time() + self.start_time = time.time() self.coin = env.coin self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url)) self.bp = BlockProcessor(env, self.daemon) @@ -141,9 +131,9 @@ class Controller(util.LoggedClass): if isinstance(session, LocalRPC): return 0 gid = self.sessions[session] - group_bandwidth = sum(s.bandwidth_used for s in self.groups[gid]) - return 1 + (bisect_left(self.bands, session.bandwidth_used) - + bisect_left(self.bands, group_bandwidth)) // 2 + group_bw = sum(session.bw_used for session in self.groups[gid]) + return 1 + (bisect_left(self.bands, session.bw_used) + + bisect_left(self.bands, group_bw)) // 2 def is_deprioritized(self, session): return self.session_priority(session) > self.BANDS @@ -166,6 +156,15 @@ class Controller(util.LoggedClass): and self.state == self.PAUSED): await self.start_external_servers() + # Periodically log sessions + if self.env.log_sessions and time.time() > self.next_log_sessions: + if self.next_log_sessions: + data = self.session_data(for_log=True) + for line in Controller.sessions_text_lines(data): + self.logger.info(line) + self.logger.info(json.dumps(self.server_summary())) + self.next_log_sessions = time.time() + self.env.log_sessions + await asyncio.sleep(1) def enqueue_session(self, session): @@ -195,7 +194,10 @@ class Controller(util.LoggedClass): while True: priority_, id_, session = await self.queue.get() if session in self.sessions: - await session.serve_requests() + await session.process_pending_items() + # Re-enqueue the session if stuff is left + if session.items: + self.enqueue_session(session) def initiate_shutdown(self): '''Call this function to start the shutdown process.''' @@ -265,8 +267,8 @@ class Controller(util.LoggedClass): async def start_server(self, kind, *args, **kw_args): protocol_class = LocalRPC if kind == 'RPC' else ElectrumX - protocol = partial(protocol_class, self, self.bp, self.env, kind) - server = self.loop.create_server(protocol, *args, **kw_args) + protocol_factory = partial(protocol_class, self, kind) + server = self.loop.create_server(protocol_factory, *args, **kw_args) host, port = args[:2] try: @@ -329,17 +331,7 @@ class Controller(util.LoggedClass): for session in self.sessions: if isinstance(session, ElectrumX): - request = self.NotificationRequest(self.bp.db_height, - touched) - session.enqueue_request(request) - # Periodically log sessions - if self.env.log_sessions and time.time() > self.next_log_sessions: - if self.next_log_sessions: - data = self.session_data(for_log=True) - for line in Controller.sessions_text_lines(data): - self.logger.info(line) - self.logger.info(json.dumps(self.server_summary())) - self.next_log_sessions = time.time() + self.env.log_sessions + await session.notify(self.bp.db_height, touched) def electrum_header(self, height): '''Return the binary header at the given height.''' @@ -357,7 +349,7 @@ class Controller(util.LoggedClass): if now > self.next_stale_check: self.next_stale_check = now + 300 self.clear_stale_sessions() - gid = int(session.start - self.start) // 900 + gid = int(session.start_time - self.start_time) // 900 self.groups[gid].append(session) self.sessions[session] = gid session.log_info('{} {}, {:,d} total' @@ -381,12 +373,12 @@ class Controller(util.LoggedClass): def close_session(self, session): '''Close the session's transport and cancel its future.''' session.close_connection() - return 'disconnected {:d}'.format(session.id_) + return 'disconnected {:d}'.format(session.session_id) def toggle_logging(self, session): '''Toggle logging of the session.''' session.log_me = not session.log_me - return 'log {:d}: {}'.format(session.id_, session.log_me) + return 'log {:d}: {}'.format(session.session_id, session.log_me) def clear_stale_sessions(self, grace=15): '''Cut off sessions that haven't done anything for 10 minutes. Force @@ -400,17 +392,17 @@ class Controller(util.LoggedClass): stale = [] for session in self.sessions: if session.is_closing(): - if session.stop <= shutdown_cutoff: - session.transport.abort() + if session.close_time <= shutdown_cutoff: + session.abort() elif session.last_recv < stale_cutoff: self.close_session(session) - stale.append(session.id_) + stale.append(session.session_id) if stale: self.logger.info('closing stale connections {}'.format(stale)) # Consolidate small groups gids = [gid for gid, l in self.groups.items() if len(l) <= 4 - and sum(session.bandwidth_used for session in l) < 10000] + and sum(session.bw_used for session in l) < 10000] if len(gids) > 1: sessions = sum([self.groups[gid] for gid in gids], []) new_gid = max(gids) @@ -436,7 +428,7 @@ class Controller(util.LoggedClass): 'paused': sum(s.pause for s in self.sessions), 'pid': os.getpid(), 'peers': self.peers.count(), - 'requests': sum(s.requests_remaining() for s in self.sessions), + 'requests': sum(s.count_pending_items() for s in self.sessions), 'sessions': self.session_count(), 'subs': self.sub_count(), 'txs_sent': self.txs_sent, @@ -445,13 +437,6 @@ class Controller(util.LoggedClass): def sub_count(self): return sum(s.sub_count() for s in self.sessions) - @staticmethod - def text_lines(method, data): - if method == 'sessions': - return Controller.sessions_text_lines(data) - else: - return Controller.groups_text_lines(data) - @staticmethod def groups_text_lines(data): '''A generator returning lines for a list of groups. @@ -482,8 +467,8 @@ class Controller(util.LoggedClass): sessions = self.groups[gid] result.append([gid, len(sessions), - sum(s.bandwidth_used for s in sessions), - sum(s.requests_remaining() for s in sessions), + sum(s.bw_used for s in sessions), + sum(s.count_pending_items() for s in sessions), sum(s.txs_sent for s in sessions), sum(s.sub_count() for s in sessions), sum(s.recv_count for s in sessions), @@ -523,17 +508,17 @@ class Controller(util.LoggedClass): def session_data(self, for_log): '''Returned to the RPC 'sessions' call.''' now = time.time() - sessions = sorted(self.sessions, key=lambda s: s.start) - return [(session.id_, + sessions = sorted(self.sessions, key=lambda s: s.start_time) + return [(session.session_id, session.flags(), session.peername(for_log=for_log), session.client, - session.requests_remaining(), + session.count_pending_items(), session.txs_sent, session.sub_count(), session.recv_count, session.recv_size, session.send_count, session.send_size, - now - session.start) + now - session.start_time) for session in sessions] def lookup_session(self, session_id): @@ -543,7 +528,7 @@ class Controller(util.LoggedClass): pass else: for session in self.sessions: - if session.id_ == session_id: + if session.session_id == session_id: return session return None @@ -562,42 +547,42 @@ class Controller(util.LoggedClass): # Local RPC command handlers - async def rpc_disconnect(self, session_ids): + def rpc_disconnect(self, session_ids): '''Disconnect sesssions. session_ids: array of session IDs ''' return self.for_each_session(session_ids, self.close_session) - async def rpc_log(self, session_ids): + def rpc_log(self, session_ids): '''Toggle logging of sesssions. session_ids: array of session IDs ''' return self.for_each_session(session_ids, self.toggle_logging) - async def rpc_stop(self): + def rpc_stop(self): '''Shut down the server cleanly.''' self.initiate_shutdown() return 'stopping' - async def rpc_getinfo(self): + def rpc_getinfo(self): '''Return summary information about the server process.''' return self.server_summary() - async def rpc_groups(self): + def rpc_groups(self): '''Return statistics about the session groups.''' return self.group_data() - async def rpc_sessions(self): + def rpc_sessions(self): '''Return statistics about connected sessions.''' return self.session_data(for_log=False) - async def rpc_peers(self): + def rpc_peers(self): '''Return a list of server peers, currently taken from IRC.''' return self.peers.peer_list() - async def rpc_reorg(self, count=3): + def rpc_reorg(self, count=3): '''Force a reorg of the given number of blocks. count: number of blocks to reorg (default 3) @@ -779,14 +764,14 @@ class Controller(util.LoggedClass): 'height': utxo.height, 'value': utxo.value} for utxo in sorted(await self.get_utxos(hashX))] - async def block_get_chunk(self, index): + def block_get_chunk(self, index): '''Return a chunk of block headers. index: the chunk index''' index = self.non_negative_integer(index) return self.get_chunk(index) - async def block_get_header(self, height): + def block_get_header(self, height): '''The deserialized header at a given height. height: the header's height''' @@ -879,6 +864,6 @@ class Controller(util.LoggedClass): return banner - async def donation_address(self): + def donation_address(self): '''Return the donation address as a string, empty if there is none.''' return self.env.donation_address diff --git a/server/peers.py b/server/peers.py index 541c983..31519a3 100644 --- a/server/peers.py +++ b/server/peers.py @@ -132,7 +132,7 @@ class PeerManager(util.LoggedClass): def peer_list(self): return self.irc_peers - async def subscribe(self): + def subscribe(self): '''Returns the server peers as a list of (ip, host, details) tuples. Despite the name this is not currently treated as a subscription.''' diff --git a/server/session.py b/server/session.py index 46ca43e..8916f49 100644 --- a/server/session.py +++ b/server/session.py @@ -9,39 +9,61 @@ import asyncio +import time import traceback +from functools import partial -from lib.jsonrpc import JSONRPC, RPCError +from lib.jsonrpc import JSONSession, RPCError from server.daemon import DaemonError from server.version import VERSION -class Session(JSONRPC): - '''Base class of ElectrumX JSON session protocols. +class SessionBase(JSONSession): + '''Base class of ElectrumX JSON sessions. Each session runs its tasks in asynchronous parallelism with other - sessions. To prevent some sessions blocking others, potentially - long-running requests should yield. + sessions. ''' - def __init__(self, controller, bp, env, kind): + def __init__(self, controller, kind): super().__init__() + self.kind = kind # 'RPC', 'TCP' etc. self.controller = controller - self.bp = bp - self.env = env - self.daemon = bp.daemon - self.kind = kind + self.bp = controller.bp + self.env = controller.env + self.daemon = self.bp.daemon self.client = 'unknown' - self.anon_logs = env.anon_logs - self.max_send = env.max_send - self.bandwidth_limit = env.bandwidth_limit + self.anon_logs = self.env.anon_logs self.last_delay = 0 self.txs_sent = 0 self.requests = [] + self.start_time = time.time() + self.close_time = 0 + self.bw_time = self.start_time + self.bw_interval = 3600 + self.bw_used = 0 - def is_closing(self): - '''True if this session is closing.''' - return self.transport and self.transport.is_closing() + def have_pending_items(self): + '''Called each time the pending item queue goes from empty to having + one item.''' + self.controller.enqueue_session(self) + + def close_connection(self): + '''Call this to close the connection.''' + self.close_time = time.time() + super().close_connection() + + def peername(self, *, for_log=True): + '''Return the peer name of this connection.''' + peer_info = self.peer_info() + if not peer_info: + return 'unknown' + if for_log and self.anon_logs: + return 'xx.xx.xx.xx:xx' + if ':' in peer_info[0]: + return '[{}]:{}'.format(peer_info[0], peer_info[1]) + else: + return '{}:{}'.format(peer_info[0], peer_info[1]) def flags(self): '''Status flags.''' @@ -53,42 +75,6 @@ class Session(JSONRPC): status += str(self.controller.session_priority(self)) return status - def requests_remaining(self): - return sum(request.remaining for request in self.requests) - - def enqueue_request(self, request): - '''Add a request to the session's list.''' - self.requests.append(request) - if len(self.requests) == 1: - self.controller.enqueue_session(self) - - async def serve_requests(self): - '''Serve requests in batches.''' - total = 0 - errs = [] - # Process 8 items at a time - for request in self.requests: - try: - initial = request.remaining - await request.process(self) - total += initial - request.remaining - except asyncio.CancelledError: - raise - except Exception: - # Should probably be considered a bug and fixed - self.log_error('error handling request {}'.format(request)) - traceback.print_exc() - errs.append(request) - await asyncio.sleep(0) - if total >= 8: - break - - # Remove completed requests and re-enqueue ourself if any remain. - self.requests = [req for req in self.requests - if req.remaining and not req in errs] - if self.requests: - self.controller.enqueue_session(self) - def connection_made(self, transport): '''Handle an incoming client connection.''' super().connection_made(transport) @@ -96,27 +82,32 @@ class Session(JSONRPC): def connection_lost(self, exc): '''Handle client disconnection.''' - super().connection_lost(exc) - if (self.pause or self.controller.is_deprioritized(self) - or self.send_size >= 1024*1024 or self.error_count): - self.log_info('disconnected. Sent {:,d} bytes in {:,d} messages ' - '{:,d} errors' - .format(self.send_size, self.send_count, - self.error_count)) + msg = '' + if self.pause: + msg += ' whilst paused' + if self.controller.is_deprioritized(self): + msg += ' whilst deprioritized' + if self.send_size >= 1024*1024: + msg += ('. Sent {:,d} bytes in {:,d} messages' + .format(self.send_size, self.send_count)) + if msg: + msg = 'disconnected' + msg + self.log_info(msg) self.controller.remove_session(self) def sub_count(self): return 0 -class ElectrumX(Session): +class ElectrumX(SessionBase): '''A TCP server that handles incoming Electrum connections.''' - def __init__(self, *args): - super().__init__(*args) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) self.subscribe_headers = False self.subscribe_height = False self.notified_height = None + self.max_send = self.env.max_send self.max_subs = self.env.max_session_subs self.hashX_subs = {} self.electrumx_handlers = { @@ -124,7 +115,7 @@ class ElectrumX(Session): 'blockchain.headers.subscribe': self.headers_subscribe, 'blockchain.numblocks.subscribe': self.numblocks_subscribe, 'blockchain.transaction.broadcast': self.transaction_broadcast, - 'server.version': self.version, + 'server.version': self.server_version, } def sub_count(self): @@ -167,12 +158,12 @@ class ElectrumX(Session): '''Used as response to a headers subscription request.''' return self.controller.electrum_header(self.height()) - async def headers_subscribe(self): + def headers_subscribe(self): '''Subscribe to get headers of new blocks.''' self.subscribe_headers = True return self.current_electrum_header() - async def numblocks_subscribe(self): + def numblocks_subscribe(self): '''Subscribe to get height of new blocks.''' self.subscribe_height = True return self.height() @@ -190,7 +181,7 @@ class ElectrumX(Session): self.hashX_subs[hashX] = address return status - async def version(self, client_name=None, protocol_version=None): + def server_version(self, client_name=None, protocol_version=None): '''Returns the server version as a string. client_name: a string identifying the client @@ -241,13 +232,13 @@ class ElectrumX(Session): return handler -class LocalRPC(Session): - '''A local TCP RPC server for querying status.''' +class LocalRPC(SessionBase): + '''A local TCP RPC server session.''' - def __init__(self, *args): - super().__init__(*args) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) self.client = 'RPC' - self.max_send = 5000000 + self.max_send = 0 def request_handler(self, method): '''Return the async handler for the given request method.''' From 1a48ad136c8be10eff352247d3374c32af6d0f8c Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 23 Jan 2017 23:39:49 +0900 Subject: [PATCH 12/12] Prepare 0.10.11 --- README.rst | 9 +++++++-- server/version.py | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/README.rst b/README.rst index b139045..ad2b58a 100644 --- a/README.rst +++ b/README.rst @@ -115,8 +115,6 @@ Roadmap Pre-1.0 - minor code cleanups. - implement simple protocol to discover peers without resorting to IRC. - This may slip to post 1.0 - Roadmap Post-1.0 ================ @@ -137,6 +135,13 @@ version prior to the release of 1.0. ChangeLog ========= +Version 0.10.11 +--------------- + +* rewrite of JSON RPC layer to improve usability for clients. + Includes support of JSON RPC v1, v2 and a compat layer that tries to + detect the peer's version. + Version 0.10.10 --------------- diff --git a/server/version.py b/server/version.py index 4a16a19..0dc2bb7 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.10.10" +VERSION = "ElectrumX 0.10.11"