From 41fb14394763b2c7f9339b762d227dfdb343695b Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 13 Nov 2016 23:08:13 +0900 Subject: [PATCH 01/21] Fix typo --- lib/jsonrpc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 663b84c..62978f4 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -63,7 +63,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): def __init__(self, msg, code=-1, **kw_args): super().__init__(**kw_args) self.msg = msg - self.code + self.code = code def __init__(self): @@ -231,7 +231,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): handler = self.method_handler(method) if not handler: - raise self.RPCError('unknown method: {}'.format(method), + raise self.RPCError('unknown method: "{}"'.format(method), self.METHOD_NOT_FOUND) return await handler(params) From 0eb9e9731e8f998abe8364fab25f6da01d57e93a Mon Sep 17 00:00:00 2001 From: Johann Bauer Date: Sun, 13 Nov 2016 21:46:19 +0100 Subject: [PATCH 02/21] Add unit tests for the methods in util --- tests/test_util.py | 48 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 tests/test_util.py diff --git a/tests/test_util.py b/tests/test_util.py new file mode 100644 index 0000000..0e92deb --- /dev/null +++ b/tests/test_util.py @@ -0,0 +1,48 @@ +from lib import util + + +def test_cachedproperty(): + class Target: + def __init__(self): + self.call_count = 0 + + @util.cachedproperty + def prop(self): + self.call_count += 1 + return self.call_count + + t = Target() + assert t.prop == t.prop == 1 + + +def test_deep_getsizeof(): + int_t = util.deep_getsizeof(1) + assert util.deep_getsizeof([1, 1]) > 2 * int_t + assert util.deep_getsizeof({1: 1}) > 2 * int_t + assert util.deep_getsizeof({1: {1: 1}}) > 3 * int_t + + +class Base: + pass + + +class A(Base): + pass + + +class B(Base): + pass + + +def test_subclasses(): + assert util.subclasses(Base) == [A, B] + + +def test_chunks(): + assert list(util.chunks([1, 2, 3, 4, 5], 2)) == [[1, 2], [3, 4], [5]] + + +def test_increment_byte_string(): + assert util.increment_byte_string(b'1') == b'2' + assert util.increment_byte_string(b'\x01\x01') == b'\x01\x02' + assert util.increment_byte_string(b'\xff\xff') == b'\x01\x00\x00' From 6849937628447250c8039752403e09e09c2bc256 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 14 Nov 2016 07:43:08 +0900 Subject: [PATCH 03/21] Add a couple more tests --- tests/test_util.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/test_util.py b/tests/test_util.py index 0e92deb..a6b79ef 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -3,6 +3,9 @@ from lib import util def test_cachedproperty(): class Target: + + CALL_COUNT = 0 + def __init__(self): self.call_count = 0 @@ -11,8 +14,15 @@ def test_cachedproperty(): self.call_count += 1 return self.call_count + @util.cachedproperty + def cls_prop(cls): + cls.CALL_COUNT += 1 + return cls.CALL_COUNT + + t = Target() assert t.prop == t.prop == 1 + assert Target.cls_prop == Target.cls_prop == 1 def test_deep_getsizeof(): @@ -36,6 +46,7 @@ class B(Base): def test_subclasses(): assert util.subclasses(Base) == [A, B] + assert util.subclasses(Base, strict=False) == [A, B, Base] def test_chunks(): From a22a4650a597e6ff0d882c8b2862b79b9fbd181d Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 14 Nov 2016 07:54:59 +0900 Subject: [PATCH 04/21] Use json.dumps to print --- electrumx_rpc.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/electrumx_rpc.py b/electrumx_rpc.py index f7f05e1..4806e26 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -13,7 +13,6 @@ import argparse import asyncio import json -import pprint from functools import partial from os import environ @@ -67,7 +66,7 @@ class RPCClient(asyncio.Protocol): '{:,d}'.format(error_count), time_fmt(time))) else: - pprint.pprint(result, indent=4) + print(json.dumps(result, indent=4, sort_keys=True)) def main(): '''Send the RPC command to the server and print the result.''' From 1b8881800d30f8f1ccd7f755cc4134d0882ee132 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 13 Nov 2016 23:33:42 +0900 Subject: [PATCH 05/21] Move the RPC handlers to the manager --- server/protocol.py | 44 ++++++++++++++++++++------------------------ 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index e77b045..36528b2 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -158,20 +158,21 @@ class ServerManager(LoggedClass): def session_count(self): return len(self.sessions) - def info(self): - '''Returned in the RPC 'getinfo' call.''' - address_count = sum(len(session.hash168s) - for session in self.sessions - if isinstance(session, ElectrumX)) + def address_count(self): + return sum(len(session.hash168s) for session in self.sessions + if isinstance(session, ElectrumX)) + + async def rpc_getinfo(self, params): + '''The RPC 'getinfo' call.''' return { 'blocks': self.bp.height, - 'peers': len(self.irc_peers()), + 'peers': len(self.irc.peers), 'sessions': self.session_count(), - 'watched': address_count, + 'watched': self.address_count(), 'cached': 0, } - def sessions_info(self): + async def rpc_sessions(self, params): '''Returned to the RPC 'sessions' call.''' now = time.time() return [(session.kind, @@ -184,6 +185,15 @@ class ServerManager(LoggedClass): now - session.start) for session in self.sessions] + async def rpc_numsessions(self, params): + return self.session_count() + + async def rpc_peers(self, params): + return self.irc.peers + + async def rpc_numpeers(self, params): + return len(self.irc.peers) + class Session(JSONRPC): '''Base class of ElectrumX JSON session protocols.''' @@ -572,19 +582,5 @@ class LocalRPC(Session): def __init__(self, *args): super().__init__(*args) cmds = 'getinfo sessions numsessions peers numpeers'.split() - self.handlers = {cmd: getattr(self, cmd) for cmd in cmds} - - async def getinfo(self, params): - return self.manager.info() - - async def sessions(self, params): - return self.manager.sessions_info() - - async def numsessions(self, params): - return self.manager.session_count() - - async def peers(self, params): - return self.manager.irc_peers() - - async def numpeers(self, params): - return len(self.manager.irc_peers()) + self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd)) + for cmd in cmds} From 12ed9f7069c3d059e564699b0740655b5f3f1d7d Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 14 Nov 2016 08:01:00 +0900 Subject: [PATCH 06/21] Have session_count return a dictionary --- server/protocol.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/protocol.py b/server/protocol.py index 36528b2..4936604 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -156,7 +156,10 @@ class ServerManager(LoggedClass): return self.irc.peers def session_count(self): - return len(self.sessions) + '''Returns a dictionary.''' + active = len(s for s in self.sessions if s.send_count) + total = len(self.sessions) + return {'active': active, 'inert': total - active, 'total': total} def address_count(self): return sum(len(session.hash168s) for session in self.sessions From 95bb1588ab89424742f891d0393ac9b9ac09c621 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 14 Nov 2016 19:15:36 +0900 Subject: [PATCH 07/21] Add missing await --- lib/jsonrpc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 62978f4..205beb3 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -172,7 +172,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): the connection is closing. ''' if isinstance(request, list): - payload = self.batch_request_payload(request) + payload = await self.batch_request_payload(request) else: payload = await self.single_request_payload(request) From dc74c869a1d4024dde6d358a3e78b162ba58cb24 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 14 Nov 2016 19:24:17 +0900 Subject: [PATCH 08/21] Await task after cancelling it. --- server/protocol.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index eeeb789..d433621 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -139,11 +139,11 @@ class ServerManager(LoggedClass): while True: task = await self.tasks.get() try: - if task.session in self.sessions: - self.current_task = task - await task.job - else: + if not task.session in self.sessions: + self.logger.info('cancelling task of gone session') task.job.cancel() + self.current_task = task + await task.job except asyncio.CancelledError: self.logger.info('cancelled task noted') except Exception: From 0725b54a4567ead738f9f9125702698f3291ed59 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 14 Nov 2016 19:52:02 +0900 Subject: [PATCH 09/21] Tweak RPC output --- electrumx_rpc.py | 53 ++++++++++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 4806e26..c7a24b8 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -35,38 +35,39 @@ class RPCClient(asyncio.Protocol): data = json.dumps(payload) + '\n' self.transport.write(data.encode()) + def print_sessions(self, result): + def data_fmt(count, size): + return '{:,d}/{:,d}KB'.format(count, size // 1024) + def time_fmt(t): + t = int(t) + return ('{:3d}:{:02d}:{:02d}' + .format(t // 3600, (t % 3600) // 60, t % 60)) + + fmt = ('{:<4} {:>23} {:>15} {:>5} ' + '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') + print(fmt.format('Type', 'Peer', 'Client', 'Subs', + 'Snt #', 'Snt KB', 'Rcv #', 'Rcv KB', + 'Errs', 'Time')) + for (kind, peer, subs, client, recv_count, recv_size, + send_count, send_size, error_count, time) in result: + print(fmt.format(kind, peer, client, '{:,d}'.format(subs), + '{:,d}'.format(recv_count), + '{:,d}'.format(recv_size // 1024), + '{:,d}'.format(send_count), + '{:,d}'.format(send_size // 1024), + '{:,d}'.format(error_count), + time_fmt(time))) + def data_received(self, data): payload = json.loads(data.decode()) self.transport.close() result = payload['result'] error = payload['error'] - if error: - print("ERROR: {}".format(error)) + if not error and self.method == 'sessions': + self.print_sessions(result) else: - def data_fmt(count, size): - return '{:,d}/{:,d}KB'.format(count, size // 1024) - def time_fmt(t): - t = int(t) - return ('{:3d}:{:02d}:{:02d}' - .format(t // 3600, (t % 3600) // 60, t % 60)) - - if self.method == 'sessions': - fmt = ('{:<4} {:>23} {:>15} {:>5} ' - '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') - print(fmt.format('Type', 'Peer', 'Client', 'Subs', - 'Snt #', 'Snt MB', 'Rcv #', 'Rcv MB', - 'Errs', 'Time')) - for (kind, peer, subs, client, recv_count, recv_size, - send_count, send_size, error_count, time) in result: - print(fmt.format(kind, peer, client, '{:,d}'.format(subs), - '{:,d}'.format(recv_count), - '{:,.1f}'.format(recv_size / 1048576), - '{:,d}'.format(send_count), - '{:,.1f}'.format(send_size / 1048576), - '{:,d}'.format(error_count), - time_fmt(time))) - else: - print(json.dumps(result, indent=4, sort_keys=True)) + value = {'error': error} if error else result + print(json.dumps(value, indent=4, sort_keys=True)) def main(): '''Send the RPC command to the server and print the result.''' From 14d3d85bd9f2a527cbf990fd2c1f0148cb3f78b8 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 14 Nov 2016 19:52:18 +0900 Subject: [PATCH 10/21] Need an array to take its len --- server/protocol.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/protocol.py b/server/protocol.py index d433621..0711c05 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -157,7 +157,7 @@ class ServerManager(LoggedClass): def session_count(self): '''Returns a dictionary.''' - active = len(s for s in self.sessions if s.send_count) + active = len([s for s in self.sessions if s.send_count]) total = len(self.sessions) return {'active': active, 'inert': total - active, 'total': total} From 5fffc251880cbc8ea05a54428e94b4410d8eb752 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 14 Nov 2016 21:12:54 +0900 Subject: [PATCH 11/21] Get send and recv the right way round --- electrumx_rpc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/electrumx_rpc.py b/electrumx_rpc.py index c7a24b8..06aa56f 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -46,7 +46,7 @@ class RPCClient(asyncio.Protocol): fmt = ('{:<4} {:>23} {:>15} {:>5} ' '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') print(fmt.format('Type', 'Peer', 'Client', 'Subs', - 'Snt #', 'Snt KB', 'Rcv #', 'Rcv KB', + 'Recv #', 'Recv KB', 'Sent #', 'Sent KB', 'Errs', 'Time')) for (kind, peer, subs, client, recv_count, recv_size, send_count, send_size, error_count, time) in result: From be45a9e22846278b59f26144fbc035963f7eace8 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 14 Nov 2016 21:15:09 +0900 Subject: [PATCH 12/21] Minor improvements to client handling Don't create a task until ready to execute it Log expensive tasks Assume peers are for logging unless explicitly not stated --- server/protocol.py | 41 ++++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index 0711c05..cf6eb35 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -50,7 +50,7 @@ class BlockServer(BlockProcessor): class ServerManager(LoggedClass): '''Manages the servers.''' - AsyncTask = namedtuple('AsyncTask', 'session job') + MgrTask = namedtuple('MgrTask', 'session task') def __init__(self, bp, env): super().__init__() @@ -59,7 +59,7 @@ class ServerManager(LoggedClass): self.servers = [] self.irc = IRC(env) self.sessions = set() - self.tasks = asyncio.Queue() + self.queue = asyncio.Queue() self.current_task = None async def start_server(self, kind, *args, **kw_args): @@ -127,27 +127,34 @@ class ServerManager(LoggedClass): self.sessions.remove(session) if self.current_task and session == self.current_task.session: self.logger.info('cancelling running task') - self.current_task.job.cancel() + self.current_task.task.cancel() - def add_task(self, session, job): + def add_task(self, session, request): assert session in self.sessions - task = asyncio.ensure_future(job) - self.tasks.put_nowait(self.AsyncTask(session, task)) + self.queue.put_nowait((session, request)) async def run_tasks(self): '''Asynchronously run through the task queue.''' while True: - task = await self.tasks.get() + session, request = await self.queue.get() + if not session in self.sessions: + continue + coro = session.handle_json_request(request) + task = asyncio.ensure_future(coro) try: - if not task.session in self.sessions: - self.logger.info('cancelling task of gone session') - task.job.cancel() - self.current_task = task - await task.job + self.current_task = self.MgrTask(session, task) + start = time.time() + await task + secs = time.time() - start + if secs > 1: + self.logger.warning('slow request for {} took {:.1f}s: {}' + .format(session.peername(), secs, + request)) except asyncio.CancelledError: self.logger.info('cancelled task noted') except Exception: # Getting here should probably be considered a bug and fixed + self.logger.error('error handling request {}'.format(request)) traceback.print_exc() finally: self.current_task = None @@ -179,7 +186,7 @@ class ServerManager(LoggedClass): '''Returned to the RPC 'sessions' call.''' now = time.time() return [(session.kind, - session.peername(), + session.peername(for_log=False), len(session.hash168s), 'RPC' if isinstance(session, LocalRPC) else session.client, session.recv_count, session.recv_size, @@ -215,7 +222,7 @@ class Session(JSONRPC): def connection_made(self, transport): '''Handle an incoming client connection.''' super().connection_made(transport) - self.logger.info('connection from {}'.format(self.peername(True))) + self.logger.info('connection from {}'.format(self.peername())) self.manager.add_session(self) def connection_lost(self, exc): @@ -224,7 +231,7 @@ class Session(JSONRPC): if self.error_count or self.send_size >= 250000: self.logger.info('{} disconnected. ' 'Sent {:,d} bytes in {:,d} messages {:,d} errors' - .format(self.peername(True), self.send_size, + .format(self.peername(), self.send_size, self.send_count, self.error_count)) self.manager.remove_session(self) @@ -234,9 +241,9 @@ class Session(JSONRPC): def on_json_request(self, request): '''Queue the request for asynchronous handling.''' - self.manager.add_task(self, self.handle_json_request(request)) + self.manager.add_task(self, request) - def peername(self, for_log=False): + def peername(self, *, for_log=True): # Anonymi{z, s}e all IP addresses that will be stored in a log if for_log and self.env.anon_logs and self.peer_info: info = ["XX.XX.XX.XX", "XX"] From 83674b1b7bce9f41813f84758d3a5df862289689 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 14 Nov 2016 21:28:42 +0900 Subject: [PATCH 13/21] Clarify peername() logic --- server/protocol.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index cf6eb35..b107919 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -244,12 +244,12 @@ class Session(JSONRPC): self.manager.add_task(self, request) def peername(self, *, for_log=True): - # Anonymi{z, s}e all IP addresses that will be stored in a log - if for_log and self.env.anon_logs and self.peer_info: - info = ["XX.XX.XX.XX", "XX"] - else: - info = self.peer_info - return 'unknown' if not info else '{}:{}'.format(info[0], info[1]) + if not self.peer_info: + return 'unknown' + # Anonymize IP addresses that will be logged + if for_log and self.env.anon_logs: + return 'xx.xx.xx.xx:xx' + return '{}:{}'.format(self.peer_info[0], self.peer_info[1]) def tx_hash_from_param(self, param): '''Raise an RPCError if the parameter is not a valid transaction From 1830cae3def86032d5c3301e68e4770589532c5f Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 14 Nov 2016 21:52:25 +0900 Subject: [PATCH 14/21] Remove a redundant message --- server/protocol.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index b107919..5b00520 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -126,7 +126,6 @@ class ServerManager(LoggedClass): def remove_session(self, session): self.sessions.remove(session) if self.current_task and session == self.current_task.session: - self.logger.info('cancelling running task') self.current_task.task.cancel() def add_task(self, session, request): @@ -151,7 +150,7 @@ class ServerManager(LoggedClass): .format(session.peername(), secs, request)) except asyncio.CancelledError: - self.logger.info('cancelled task noted') + self.logger.info('running task cancelled') except Exception: # Getting here should probably be considered a bug and fixed self.logger.error('error handling request {}'.format(request)) From 95cc97946566daae231b73074d40311d23594662 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 14 Nov 2016 23:12:50 +0900 Subject: [PATCH 15/21] Yield more during mempool load --- server/block_processor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index afc97ff..9a89314 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -215,7 +215,7 @@ class MemPool(LoggedClass): for n, (hex_hash, tx) in enumerate(new_txs.items()): # Yield to process e.g. signals - if n % 500 == 0: + if n % 100 == 0: await asyncio.sleep(0) txout_pairs = [txout_pair(txout) for txout in tx.outputs] self.txs[hex_hash] = (None, txout_pairs, None) @@ -236,7 +236,7 @@ class MemPool(LoggedClass): # Now add the inputs for n, (hex_hash, tx) in enumerate(new_txs.items()): # Yield to process e.g. signals - if n % 50 == 0: + if n % 10 == 0: await asyncio.sleep(0) if initial and time.time() > next_log: From 8c55f41305753b9027b569382542f8dc6d05727b Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 14 Nov 2016 23:13:34 +0900 Subject: [PATCH 16/21] Accept more SSL protocols --- server/protocol.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index 5b00520..05bb5f7 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -95,8 +95,8 @@ class ServerManager(LoggedClass): await self.start_server('TCP', env.host, env.tcp_port) if env.ssl_port is not None: - # FIXME: update if we want to require Python >= 3.5.3 - sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + # Python 3.5.3: use PROTOCOL_TLS + sslc = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc) From 8671e5718373efbe46207cb19398fe2a481e8710 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 15 Nov 2016 06:28:52 +0900 Subject: [PATCH 17/21] Move fs flushes to same time as history flushes --- server/block_processor.py | 57 +++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 9a89314..9b5a8e9 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -319,6 +319,8 @@ class BlockProcessor(server.db.DB): super().__init__(env) # These are our state as we move ahead of DB state + self.fs_height = self.db_height + self.fs_tx_count = self.db_tx_count self.height = self.db_height self.tip = self.db_tip self.tx_count = self.db_tx_count @@ -526,7 +528,8 @@ class BlockProcessor(server.db.DB): def assert_flushed(self): '''Asserts state is fully flushed.''' - assert self.tx_count == self.db_tx_count + assert self.tx_count == self.fs_tx_count == self.db_tx_count + assert self.height == self.fs_height == self.db_height assert not self.history assert not self.utxo_cache assert not self.db_cache @@ -563,9 +566,10 @@ class BlockProcessor(server.db.DB): # time it took to commit the batch self.flush_state(self.db) - self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} took {:,d}s' + self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} ' + 'took {:,.1f}s' .format(self.flush_count, self.height, self.tx_count, - int(self.last_flush - flush_start))) + self.last_flush - flush_start)) # Catch-up stats if show_stats: @@ -591,7 +595,12 @@ class BlockProcessor(server.db.DB): formatted_time(tx_est / this_tx_per_sec))) def flush_history(self, batch): - flush_start = time.time() + fs_flush_start = time.time() + self.fs_flush() + fs_flush_end = time.time() + self.logger.info('FS flush took {:.1f} seconds' + .format(fs_flush_end - fs_flush_start)) + flush_id = pack('>H', self.flush_count) for hash168, hist in self.history.items(): @@ -599,21 +608,21 @@ class BlockProcessor(server.db.DB): batch.put(key, hist.tobytes()) self.logger.info('flushed {:,d} history entries for {:,d} addrs ' - 'in {:,d}s' + 'in {:.1f}s' .format(self.history_size, len(self.history), - int(time.time() - flush_start))) + time.time() - fs_flush_end)) self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 def fs_flush(self): '''Flush the things stored on the filesystem.''' blocks_done = len(self.headers) - prior_tx_count = (self.tx_counts[self.db_height] - if self.db_height >= 0 else 0) + prior_tx_count = (self.tx_counts[self.fs_height] + if self.fs_height >= 0 else 0) cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 txs_done = cur_tx_count - prior_tx_count - assert self.db_height + blocks_done == self.height + assert self.fs_height + blocks_done == self.height assert len(self.tx_hashes) == blocks_done assert len(self.tx_counts) == self.height + 1 assert cur_tx_count == self.tx_count, \ @@ -622,13 +631,13 @@ class BlockProcessor(server.db.DB): # First the headers headers = b''.join(self.headers) header_len = self.coin.HEADER_LEN - self.headers_file.seek((self.db_height + 1) * header_len) + self.headers_file.seek((self.fs_height + 1) * header_len) self.headers_file.write(headers) self.headers_file.flush() # Then the tx counts - self.txcount_file.seek((self.db_height + 1) * self.tx_counts.itemsize) - self.txcount_file.write(self.tx_counts[self.db_height + 1:]) + self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize) + self.txcount_file.write(self.tx_counts[self.fs_height + 1:]) self.txcount_file.flush() # Finally the hashes @@ -648,7 +657,8 @@ class BlockProcessor(server.db.DB): file_pos += size os.sync() - + self.fs_height = self.height + self.fs_tx_count = self.tx_count self.tx_hashes = [] self.headers = [] @@ -692,9 +702,9 @@ class BlockProcessor(server.db.DB): utxo_cache_size = len(self.utxo_cache) * 187 db_cache_size = len(self.db_cache) * 105 hist_cache_size = len(self.history) * 180 + self.history_size * 4 - tx_hash_size = (self.tx_count - self.db_tx_count) * 74 - utxo_MB = (db_cache_size + utxo_cache_size + tx_hash_size) // one_MB - hist_MB = hist_cache_size // one_MB + tx_hash_size = (self.tx_count - self.fs_tx_count) * 74 + utxo_MB = (db_cache_size + utxo_cache_size) // one_MB + hist_MB = (hist_cache_size + tx_hash_size) // one_MB self.logger.info('UTXOs: {:,d} deletes: {:,d} ' 'UTXOs {:,d}MB hist {:,d}MB' @@ -978,6 +988,7 @@ class BlockProcessor(server.db.DB): # Care is needed because the writes generated by flushing the # UTXO state may have keys in common with our write cache or # may be in the DB already. + flush_start = time.time() self.logger.info('flushing {:,d} blocks with {:,d} txs' .format(self.height - self.db_height, self.tx_count - self.db_tx_count)) @@ -987,12 +998,6 @@ class BlockProcessor(server.db.DB): self.utxo_cache_spends, self.db_deletes)) - fs_flush_start = time.time() - self.fs_flush() - fs_flush_end = time.time() - self.logger.info('FS flush took {:.1f} seconds' - .format(fs_flush_end - fs_flush_start)) - collisions = 0 new_utxos = len(self.utxo_cache) @@ -1031,18 +1036,18 @@ class BlockProcessor(server.db.DB): self.db_tip = self.tip self.logger.info('UTXO flush took {:.1f} seconds' - .format(time.time() - fs_flush_end)) + .format(time.time() - flush_start)) def read_headers(self, start, count): # Read some from disk - disk_count = min(count, self.db_height + 1 - start) + disk_count = min(count, self.fs_height + 1 - start) result = self.fs_read_headers(start, disk_count) count -= disk_count start += disk_count # The rest from memory if count: - start -= self.db_height + 1 + start -= self.fs_height + 1 if not (count >= 0 and start + count <= len(self.headers)): raise ChainError('{:,d} headers starting at {:,d} not on disk' .format(count, start)) @@ -1056,7 +1061,7 @@ class BlockProcessor(server.db.DB): # Is this unflushed? if tx_hash is None: - tx_hashes = self.tx_hashes[tx_height - (self.db_height + 1)] + tx_hashes = self.tx_hashes[tx_height - (self.fs_height + 1)] tx_hash = tx_hashes[tx_num - self.tx_counts[tx_height - 1]] return tx_hash, tx_height From a2280bbc939abfcefbd2a5968252c1ed577f4fec Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 15 Nov 2016 07:59:45 +0900 Subject: [PATCH 18/21] Clean up futures handling --- electrumx_server.py | 9 ++-- server/block_processor.py | 19 ++++++--- server/protocol.py | 90 +++++++++++++++++++-------------------- 3 files changed, 62 insertions(+), 56 deletions(-) diff --git a/electrumx_server.py b/electrumx_server.py index 939bd2e..a0a505e 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -32,22 +32,21 @@ def main_loop(): def on_signal(signame): '''Call on receipt of a signal to cleanly shutdown.''' logging.warning('received {} signal, shutting down'.format(signame)) - for task in asyncio.Task.all_tasks(): - task.cancel() + future.cancel() + + server = BlockServer(Env()) + future = asyncio.ensure_future(server.main_loop()) # Install signal handlers for signame in ('SIGINT', 'SIGTERM'): loop.add_signal_handler(getattr(signal, signame), partial(on_signal, signame)) - server = BlockServer(Env()) - future = server.start() try: loop.run_until_complete(future) except asyncio.CancelledError: pass finally: - server.stop() loop.close() diff --git a/server/block_processor.py b/server/block_processor.py index 9b5a8e9..cc959c5 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -329,6 +329,7 @@ class BlockProcessor(server.db.DB): self.daemon.debug_set_height(self.height) self.mempool = MemPool(self) self.touched = set() + self.futures = [] # Meta self.utxo_MB = env.utxo_MB @@ -371,24 +372,30 @@ class BlockProcessor(server.db.DB): self.clean_db() - def start(self): - '''Returns a future that starts the block processor when awaited.''' - return asyncio.gather(self.main_loop(), - self.prefetcher.main_loop()) - async def main_loop(self): '''Main loop for block processing. Safely flushes the DB on clean shutdown. ''' + self.futures.append(asyncio.ensure_future(self.prefetcher.main_loop())) try: while True: await self._wait_for_update() await asyncio.sleep(0) # Yield except asyncio.CancelledError: - self.flush(True) + self.on_cancel() + # This lets the asyncio subsystem process futures cancellations + await asyncio.sleep(0) raise + def on_cancel(self): + '''Called when the main loop is cancelled. + + Intended to be overridden in derived classes.''' + for future in self.futures: + future.cancel() + self.flush(True) + async def _wait_for_update(self): '''Wait for the prefetcher to deliver blocks or a mempool update. diff --git a/server/protocol.py b/server/protocol.py index 05bb5f7..183a3da 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -42,9 +42,10 @@ class BlockServer(BlockProcessor): self.bs_caught_up = True self.server_mgr.notify(self.height, self.touched) - def stop(self): - '''Close the listening servers.''' + def on_cancel(self): + '''Called when the main loop is cancelled.''' self.server_mgr.stop() + super().on_cancel() class ServerManager(LoggedClass): @@ -58,9 +59,8 @@ class ServerManager(LoggedClass): self.env = env self.servers = [] self.irc = IRC(env) - self.sessions = set() - self.queue = asyncio.Queue() - self.current_task = None + self.sessions = {} + self.futures = [] # At present just the IRC future, if any async def start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() @@ -100,11 +100,9 @@ class ServerManager(LoggedClass): sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc) - asyncio.ensure_future(self.run_tasks()) - if env.irc: self.logger.info('starting IRC coroutine') - asyncio.ensure_future(self.irc.start()) + self.futures.append(asyncio.ensure_future(self.irc.start())) else: self.logger.info('IRC disabled') @@ -115,48 +113,25 @@ class ServerManager(LoggedClass): ElectrumX.notify(sessions, height, touched) def stop(self): - '''Close the listening servers.''' + '''Close listening servers.''' for server in self.servers: server.close() + self.servers = [] + for future in self.futures: + future.cancel() + self.futures = [] + sessions = list(self.sessions.keys()) # A copy + for session in sessions: + self.remove_session(session) def add_session(self, session): assert session not in self.sessions - self.sessions.add(session) + coro = session.serve_requests() + self.sessions[session] = asyncio.ensure_future(coro) def remove_session(self, session): - self.sessions.remove(session) - if self.current_task and session == self.current_task.session: - self.current_task.task.cancel() - - def add_task(self, session, request): - assert session in self.sessions - self.queue.put_nowait((session, request)) - - async def run_tasks(self): - '''Asynchronously run through the task queue.''' - while True: - session, request = await self.queue.get() - if not session in self.sessions: - continue - coro = session.handle_json_request(request) - task = asyncio.ensure_future(coro) - try: - self.current_task = self.MgrTask(session, task) - start = time.time() - await task - secs = time.time() - start - if secs > 1: - self.logger.warning('slow request for {} took {:.1f}s: {}' - .format(session.peername(), secs, - request)) - except asyncio.CancelledError: - self.logger.info('running task cancelled') - except Exception: - # Getting here should probably be considered a bug and fixed - self.logger.error('error handling request {}'.format(request)) - traceback.print_exc() - finally: - self.current_task = None + future = self.sessions.pop(session) + future.cancel() def irc_peers(self): return self.irc.peers @@ -205,7 +180,12 @@ class ServerManager(LoggedClass): class Session(JSONRPC): - '''Base class of ElectrumX JSON session protocols.''' + '''Base class of ElectrumX JSON session protocols. + + Each session runs its tasks in asynchronous parallelism with other + sessions. To prevent some sessions blocking othersr, potentially + long-running requests should yield (not yet implemented). + ''' def __init__(self, manager, bp, env, kind): super().__init__() @@ -216,6 +196,8 @@ class Session(JSONRPC): self.coin = bp.coin self.kind = kind self.hash168s = set() + self.requests = asyncio.Queue() + self.current_task = None self.client = 'unknown' def connection_made(self, transport): @@ -240,7 +222,25 @@ class Session(JSONRPC): def on_json_request(self, request): '''Queue the request for asynchronous handling.''' - self.manager.add_task(self, request) + self.requests.put_nowait(request) + + async def serve_requests(self): + '''Asynchronously run through the task queue.''' + while True: + await asyncio.sleep(0) + request = await self.requests.get() + try: + start = time.time() + await self.handle_json_request(request) + secs = time.time() - start + if secs > 1: + self.logger.warning('slow request for {} took {:.1f}s: {}' + .format(session.peername(), secs, + request)) + except Exception: + # Getting here should probably be considered a bug and fixed + self.logger.error('error handling request {}'.format(request)) + traceback.print_exc() def peername(self, *, for_log=True): if not self.peer_info: From db187540d509b8b39219622a0e1755de096371b6 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 15 Nov 2016 08:12:52 +0900 Subject: [PATCH 19/21] Have task coros handle cancelled exceptions --- server/block_processor.py | 2 ++ server/irc.py | 6 ++++++ server/protocol.py | 4 +++- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/server/block_processor.py b/server/block_processor.py index cc959c5..9829497 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -91,6 +91,8 @@ class Prefetcher(LoggedClass): await asyncio.sleep(0) except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) + except asyncio.CancelledError: + break async def _caught_up(self): '''Poll for new blocks and mempool state. diff --git a/server/irc.py b/server/irc.py index a23bd3b..0a39dd8 100644 --- a/server/irc.py +++ b/server/irc.py @@ -51,6 +51,12 @@ class IRC(LoggedClass): self.peers = {} async def start(self): + try: + await self.join() + except asyncio.CancelledError: + pass + + async def join(self): import irc.client as irc_client self.logger.info('joining IRC with nick "{}" and real name "{}"' diff --git a/server/protocol.py b/server/protocol.py index 183a3da..6eda042 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -235,8 +235,10 @@ class Session(JSONRPC): secs = time.time() - start if secs > 1: self.logger.warning('slow request for {} took {:.1f}s: {}' - .format(session.peername(), secs, + .format(self.peername(), secs, request)) + except asyncio.CancelledError: + break except Exception: # Getting here should probably be considered a bug and fixed self.logger.error('error handling request {}'.format(request)) From d85034353f0856288c4e73d0c13a8529c210f109 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 15 Nov 2016 20:13:50 +0900 Subject: [PATCH 20/21] Remove useless re-raise of cancellation --- electrumx_server.py | 8 ++------ server/block_processor.py | 1 - 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/electrumx_server.py b/electrumx_server.py index a0a505e..d851aa6 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -42,12 +42,8 @@ def main_loop(): loop.add_signal_handler(getattr(signal, signame), partial(on_signal, signame)) - try: - loop.run_until_complete(future) - except asyncio.CancelledError: - pass - finally: - loop.close() + loop.run_until_complete(future) + loop.close() def main(): diff --git a/server/block_processor.py b/server/block_processor.py index 9829497..63a3c88 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -388,7 +388,6 @@ class BlockProcessor(server.db.DB): self.on_cancel() # This lets the asyncio subsystem process futures cancellations await asyncio.sleep(0) - raise def on_cancel(self): '''Called when the main loop is cancelled. From b60eb5ca171c9cfc7a7520e12dce9aae2621a0ca Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 15 Nov 2016 21:34:08 +0900 Subject: [PATCH 21/21] Prepare 0.5.1 --- docs/RELEASE-NOTES | 40 ++++++++++++++++++++++++++++++++++++++++ server/version.py | 2 +- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index fbbc463..a599d6c 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -1,3 +1,43 @@ +version 0.5.1 +------------- + +- 0.5 changed some cache defaults, only partially intentionally. For + some users, including me, the result was a regression (a 15hr HDD + sync became a 20hr sync). Another user reported their fastest sync + yet (sub 10hr SSD sync). What changed was memory accounting - all + releases until 0.5 were not properly accounting for memory usage of + unflushed transaction hashes. In 0.5 they were accounted for in the + UTXO cache, which resulted in much earlier flushes. 0.5.1 flushes + the hashes at the same time as history so I now account for it + towards the history cache limit. To get a reasonable comparison + with prior releases your HIST_MB environment variable should be + bumped by about 15% from 0.4 and earlier values. This will not + result in greater memory consumption - the additional memory + consumption was being ignored before but is now being included. +- 0.5.1 is the first release where Electrum client requests are queued + on a per-session basis. Previously they were in a global queue. + This is the beginning of ensuring that expensive / DOS requests + mostly affect that user's session and not those of other users. The + goal is that each session's requests run asynchronously parallel to + every other sessions's requests. The missing part of the puzzle is + that Python's asyncio is co-operative, however at the moment + ElectrumX does not yield during expensive requests. I intend that a + near upcoming release will ensure expensive requests yield the CPU + at regular fine-grained intervals. The intended result is that, to + some extent, expensive requests mainly delay that and later requests + from the same session, and have minimal impact on the legitimate + requests of other sessions. The extent to which this goal is + achieved will only be verifiable in practice. +- more robust tracking and handling of asynchronous tasks. I hope + this will reduce asyncio's logging messages, some of which I'm + becoming increasingly convinced I have no control over. In + particular I learned earlier releases were unintentionally limiting + the universe of acceptable SSL protocols, and so I made them the + default that had been intended. +- I added logging of expensive tasks, though I don't expect much real + information from this +- various RPC improvements + version 0.5 ----------- diff --git a/server/version.py b/server/version.py index c49ba4d..77fe367 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.5" +VERSION = "ElectrumX 0.5.1"