From 0c8d5ddf634081903f77f2cc68f63c4e4f4abeca Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 6 Aug 2018 21:38:28 +0900 Subject: [PATCH 01/38] Mark as dev branch --- electrumx/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/electrumx/__init__.py b/electrumx/__init__.py index 475394d..f0f8917 100644 --- a/electrumx/__init__.py +++ b/electrumx/__init__.py @@ -1,4 +1,4 @@ -version = 'ElectrumX 1.8' +version = 'ElectrumX 1.8.1-dev' version_short = version.split()[-1] from electrumx.server.controller import Controller From a036a2eb3f0490f2f678f11b8544077ec6e8e2cd Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 6 Aug 2018 20:22:01 +0900 Subject: [PATCH 02/38] Rename get_utxos to all_utxos. - no longer takes a limit - runs in a thread to avoid blocking --- contrib/query.py | 7 ++++-- electrumx/server/chain_state.py | 15 +++++-------- electrumx/server/db.py | 39 +++++++++++++++------------------ electrumx/server/peers.py | 2 +- electrumx/server/session.py | 4 ++-- 5 files changed, 32 insertions(+), 35 deletions(-) diff --git a/contrib/query.py b/contrib/query.py index 955ff37..1a85dd7 100755 --- a/contrib/query.py +++ b/contrib/query.py @@ -80,13 +80,16 @@ async def query(args): if n is None: print('No history found') n = None - for n, utxo in enumerate(db.get_utxos(hashX, limit), start=1): + utxos = await db.all_utxos(hashX) + for n, utxo in enumerate(utxos, start=1): print(f'UTXO #{n:,d}: tx_hash {hash_to_hex_str(utxo.tx_hash)} ' f'tx_pos {utxo.tx_pos:,d} height {utxo.height:,d} ' f'value {utxo.value:,d}') + if n == limit: + break if n is None: print('No UTXOs found') - balance = db.get_balance(hashX) + balance = sum(utxo.value for utxo in utxos) print(f'Balance: {coin.decimal_value(balance):,f} {coin.SHORTNAME}') diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index 38b4e7b..b950bf4 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -27,6 +27,7 @@ class ChainState(object): self.force_chain_reorg = self._bp.force_chain_reorg self.tx_branch_and_root = self._bp.merkle.branch_and_root self.read_headers = self._bp.read_headers + self.all_utxos = self._bp.all_utxos async def broadcast_transaction(self, raw_tx): return await self._daemon.sendrawtransaction([raw_tx]) @@ -57,13 +58,6 @@ class ChainState(object): return await run_in_thread(job) - async def get_utxos(self, hashX): - '''Get UTXOs asynchronously to reduce latency.''' - def job(): - return list(self._bp.get_utxos(hashX, limit=None)) - - return await run_in_thread(job) - def header_branch_and_root(self, length, height): return self._bp.header_mc.branch_and_root(length, height) @@ -115,15 +109,18 @@ class ChainState(object): if n is None: lines.append('No history found') n = None - for n, utxo in enumerate(db.get_utxos(hashX, limit), start=1): + utxos = await db.all_utxos(hashX) + for n, utxo in enumerate(utxos, start=1): lines.append(f'UTXO #{n:,d}: tx_hash ' f'{hash_to_hex_str(utxo.tx_hash)} ' f'tx_pos {utxo.tx_pos:,d} height ' f'{utxo.height:,d} value {utxo.value:,d}') + if n == limit: + break if n is None: lines.append('No UTXOs found') - balance = db.get_balance(hashX) + balance = sum(utxo.value for utxo in utxos) lines.append(f'Balance: {coin.decimal_value(balance):,f} ' f'{coin.SHORTNAME}') diff --git a/electrumx/server/db.py b/electrumx/server/db.py index 8abb0d8..c288ff3 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -375,28 +375,25 @@ class DB(object): with self.utxo_db.write_batch() as batch: self.write_utxo_state(batch) - def get_balance(self, hashX): - '''Returns the confirmed balance of an address.''' - return sum(utxo.value for utxo in self.get_utxos(hashX, limit=None)) - - def get_utxos(self, hashX, limit=1000): - '''Generator that yields all UTXOs for an address sorted in no - particular order. By default yields at most 1000 entries. - Set limit to None to get them all. + async def all_utxos(self, hashX): + '''Return all UTXOs for an address sorted in no particular order. By + default yields at most 1000 entries. ''' - limit = util.resolve_limit(limit) - s_unpack = unpack - # Key: b'u' + address_hashX + tx_idx + tx_num - # Value: the UTXO value as a 64-bit unsigned integer - prefix = b'u' + hashX - for db_key, db_value in self.utxo_db.iterator(prefix=prefix): - if limit == 0: - return - limit -= 1 - tx_pos, tx_num = s_unpack(' Date: Mon, 6 Aug 2018 20:46:09 +0900 Subject: [PATCH 03/38] Rename get_history to limited_history - make it async and run in a thread --- contrib/query.py | 4 ++-- electrumx/server/chain_state.py | 19 +++---------------- electrumx/server/db.py | 20 ++++++++++++-------- electrumx/server/session.py | 14 ++++++++++---- 4 files changed, 27 insertions(+), 30 deletions(-) diff --git a/contrib/query.py b/contrib/query.py index 1a85dd7..60f1696 100755 --- a/contrib/query.py +++ b/contrib/query.py @@ -73,8 +73,8 @@ async def query(args): if not hashX: continue n = None - for n, (tx_hash, height) in enumerate(db.get_history(hashX, limit), - start=1): + history = await db.limited_history(hashX, limit=limit) + for n, (tx_hash, height) in enumerate(history, start=1): print(f'History #{n:,d}: height {height:,d} ' f'tx_hash {hash_to_hex_str(tx_hash)}') if n is None: diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index b950bf4..d9d6fb2 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -8,8 +8,6 @@ import asyncio -from aiorpcx import run_in_thread - from electrumx.lib.hash import hash_to_hex_str @@ -28,6 +26,7 @@ class ChainState(object): self.tx_branch_and_root = self._bp.merkle.branch_and_root self.read_headers = self._bp.read_headers self.all_utxos = self._bp.all_utxos + self.limited_history = self._bp.limited_history async def broadcast_transaction(self, raw_tx): return await self._daemon.sendrawtransaction([raw_tx]) @@ -46,18 +45,6 @@ class ChainState(object): 'db_height': self.db_height(), } - async def get_history(self, hashX): - '''Get history asynchronously to reduce latency.''' - def job(): - # History DoS limit. Each element of history is about 99 - # bytes when encoded as JSON. This limits resource usage - # on bloated history requests, and uses a smaller divisor - # so large requests are logged before refusing them. - limit = self._env.max_send // 97 - return list(self._bp.get_history(hashX, limit=limit)) - - return await run_in_thread(job) - def header_branch_and_root(self, length, height): return self._bp.header_mc.branch_and_root(length, height) @@ -102,8 +89,8 @@ class ChainState(object): if not hashX: continue n = None - for n, (tx_hash, height) in enumerate( - db.get_history(hashX, limit), start=1): + history = await db.limited_history(hashX, limit=limit) + for n, (tx_hash, height) in enumerate(history): lines.append(f'History #{n:,d}: height {height:,d} ' f'tx_hash {hash_to_hex_str(tx_hash)}') if n is None: diff --git a/electrumx/server/db.py b/electrumx/server/db.py index c288ff3..3eeadb4 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -227,15 +227,19 @@ class DB(object): return [self.coin.header_hash(header) for header in headers] - def get_history(self, hashX, limit=1000): - '''Generator that returns an unpruned, sorted list of (tx_hash, - height) tuples of confirmed transactions that touched the address, - earliest in the blockchain first. Includes both spending and - receiving transactions. By default yields at most 1000 entries. - Set limit to None to get them all. + async def limited_history(self, hashX, *, limit=1000): + '''Return an unpruned, sorted list of (tx_hash, height) tuples of + confirmed transactions that touched the address, earliest in + the blockchain first. Includes both spending and receiving + transactions. By default returns at most 1000 entries. Set + limit to None to get them all. ''' - for tx_num in self.history.get_txnums(hashX, limit): - yield self.fs_tx_hash(tx_num) + def read_history(): + tx_nums = list(self.history.get_txnums(hashX, limit)) + fs_tx_hash = self.fs_tx_hash + return [fs_tx_hash(tx_num) for tx_num in tx_nums] + + return await run_in_thread(read_history) # -- Undo information diff --git a/electrumx/server/session.py b/electrumx/server/session.py index d5d89eb..e450108 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -444,11 +444,17 @@ class SessionManager(object): '''The number of connections that we've sent something to.''' return len(self.sessions) - async def get_history(self, hashX): + async def limited_history(self, hashX): '''A caching layer.''' hc = self._history_cache if hashX not in hc: - hc[hashX] = await self.chain_state.get_history(hashX) + # History DoS limit. Each element of history is about 99 + # bytes when encoded as JSON. This limits resource usage + # on bloated history requests, and uses a smaller divisor + # so large requests are logged before refusing them. + limit = self.env.max_send // 97 + hc[hashX] = await self.chain_state.limited_history(hashX, + limit=limit) return hc[hashX] async def _notify_sessions(self, height, touched): @@ -773,7 +779,7 @@ class ElectrumX(SessionBase): ''' # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 - history = await self.session_mgr.get_history(hashX) + history = await self.session_mgr.limited_history(hashX) mempool = await self.mempool.transaction_summaries(hashX) status = ''.join('{}:{:d}:'.format(hash_to_hex_str(tx_hash), height) @@ -873,7 +879,7 @@ class ElectrumX(SessionBase): async def confirmed_and_unconfirmed_history(self, hashX): # Note history is ordered but unconfirmed is unordered in e-s - history = await self.session_mgr.get_history(hashX) + history = await self.session_mgr.limited_history(hashX) conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height} for tx_hash, height in history] return conf + await self.unconfirmed_history(hashX) From f24b022fa48f5cfc7a3cca9d20478d57fb3a0c71 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 6 Aug 2018 20:51:35 +0900 Subject: [PATCH 04/38] Remove dead code --- electrumx/server/chain_state.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index d9d6fb2..4704fbf 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -48,10 +48,6 @@ class ChainState(object): def header_branch_and_root(self, length, height): return self._bp.header_mc.branch_and_root(length, height) - def processing_new_block(self): - '''Return True if we're processing a new block.''' - return self._daemon.cached_height() > self.db_height() - def raw_header(self, height): '''Return the binary header at the given height.''' header, n = self._bp.read_headers(height, 1) From db5d5167562f99b61e1a0aee5303f780a7be346b Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 6 Aug 2018 20:57:48 +0900 Subject: [PATCH 05/38] Make raw_header async --- electrumx/server/chain_state.py | 2 +- electrumx/server/peers.py | 2 +- electrumx/server/session.py | 48 +++++++++++++++++---------------- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index 4704fbf..58534db 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -48,7 +48,7 @@ class ChainState(object): def header_branch_and_root(self, length, height): return self._bp.header_mc.branch_and_root(length, height) - def raw_header(self, height): + async def raw_header(self, height): '''Return the binary header at the given height.''' header, n = self._bp.read_headers(height, 1) if n != 1: diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index 955d0e6..0c7ba1c 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -315,7 +315,7 @@ class PeerManager(object): # Check prior header too in case of hard fork. check_height = min(our_height, their_height) - raw_header = self.chain_state.raw_header(check_height) + raw_header = await self.chain_state.raw_header(check_height) if ptuple >= (1, 4): ours = raw_header.hex() message = 'blockchain.block.header' diff --git a/electrumx/server/session.py b/electrumx/server/session.py index e450108..33e6cea 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -61,6 +61,13 @@ def non_negative_integer(value): f'{value} should be a non-negative integer') +def assert_boolean(value): + '''Return param value it is boolean otherwise raise an RPCError.''' + if value in (False, True): + return value + raise RPCError(BAD_REQUEST, f'{value} should be a boolean value') + + def assert_tx_hash(value): '''Raise an RPCError if the value is not a valid transaction hash.''' @@ -712,7 +719,7 @@ class ElectrumX(SessionBase): if height_changed: self.notified_height = height if self.subscribe_headers: - args = (self.subscribe_headers_result(height), ) + args = (await self.subscribe_headers_result(height), ) await self.send_notification('blockchain.headers.subscribe', args) @@ -720,49 +727,44 @@ class ElectrumX(SessionBase): if touched or (height_changed and self.mempool_statuses): await self.notify_touched(touched) - def assert_boolean(self, value): - '''Return param value it is boolean otherwise raise an RPCError.''' - if value in (False, True): - return value - raise RPCError(BAD_REQUEST, f'{value} should be a boolean value') - - def raw_header(self, height): + async def raw_header(self, height): '''Return the binary header at the given height.''' try: - return self.chain_state.raw_header(height) + return await self.chain_state.raw_header(height) except IndexError: - raise RPCError(BAD_REQUEST, f'height {height:,d} out of range') + raise RPCError(BAD_REQUEST, f'height {height:,d} ' + 'out of range') from None - def electrum_header(self, height): + async def electrum_header(self, height): '''Return the deserialized header at the given height.''' - raw_header = self.raw_header(height) + raw_header = await self.raw_header(height) return self.coin.electrum_header(raw_header, height) - def subscribe_headers_result(self, height): + async def subscribe_headers_result(self, height): '''The result of a header subscription for the given height.''' if self.subscribe_headers_raw: - raw_header = self.raw_header(height) + raw_header = await self.raw_header(height) return {'hex': raw_header.hex(), 'height': height} - return self.electrum_header(height) + return await self.electrum_header(height) - def _headers_subscribe(self, raw): + async def _headers_subscribe(self, raw): '''Subscribe to get headers of new blocks.''' self.subscribe_headers = True - self.subscribe_headers_raw = self.assert_boolean(raw) + self.subscribe_headers_raw = assert_boolean(raw) self.notified_height = self.db_height() - return self.subscribe_headers_result(self.notified_height) + return await self.subscribe_headers_result(self.notified_height) async def headers_subscribe(self): '''Subscribe to get raw headers of new blocks.''' - return self._headers_subscribe(True) + return await self._headers_subscribe(True) async def headers_subscribe_True(self, raw=True): '''Subscribe to get headers of new blocks.''' - return self._headers_subscribe(raw) + return await self._headers_subscribe(raw) async def headers_subscribe_False(self, raw=False): '''Subscribe to get headers of new blocks.''' - return self._headers_subscribe(raw) + return await self._headers_subscribe(raw) async def add_peer(self, features): '''Add a peer (but only if the peer resolves to the source).''' @@ -925,7 +927,7 @@ class ElectrumX(SessionBase): dictionary with a merkle proof.''' height = non_negative_integer(height) cp_height = non_negative_integer(cp_height) - raw_header_hex = self.raw_header(height).hex() + raw_header_hex = (await self.raw_header(height)).hex() if cp_height == 0: return raw_header_hex result = {'header': raw_header_hex} @@ -976,7 +978,7 @@ class ElectrumX(SessionBase): height: the header's height''' height = non_negative_integer(height) - return self.electrum_header(height) + return await self.electrum_header(height) def is_tor(self): '''Try to detect if the connection is to a tor hidden service we are From 1efc8cb8ec2ecf21e516dd7019d75e9ac2987610 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 6 Aug 2018 21:27:33 +0900 Subject: [PATCH 06/38] Make the merkle cache and read_headers async read_headers runs in a thread to avoid blocking --- electrumx/lib/merkle.py | 37 +++++++----- electrumx/server/block_processor.py | 18 ++---- electrumx/server/chain_state.py | 4 +- electrumx/server/db.py | 28 +++++---- electrumx/server/session.py | 13 +++-- tests/lib/test_merkle.py | 88 +++++++++++++++++------------ 6 files changed, 105 insertions(+), 83 deletions(-) diff --git a/electrumx/lib/merkle.py b/electrumx/lib/merkle.py index 215879e..d8e5971 100644 --- a/electrumx/lib/merkle.py +++ b/electrumx/lib/merkle.py @@ -158,13 +158,16 @@ class Merkle(object): class MerkleCache(object): '''A cache to calculate merkle branches efficiently.''' - def __init__(self, merkle, source, length): - '''Initialise a cache of length hashes taken from source.''' + def __init__(self, merkle, source_func): + '''Initialise a cache hashes taken from source_func: + + async def source_func(index, count): + ... + ''' self.merkle = merkle - self.source = source - self.length = length - self.depth_higher = merkle.tree_depth(length) // 2 - self.level = self._level(source.hashes(0, length)) + self.source_func = source_func + self.length = 0 + self.depth_higher = 0 def _segment_length(self): return 1 << self.depth_higher @@ -179,18 +182,18 @@ class MerkleCache(object): def _level(self, hashes): return self.merkle.level(hashes, self.depth_higher) - def _extend_to(self, length): + async def _extend_to(self, length): '''Extend the length of the cache if necessary.''' if length <= self.length: return # Start from the beginning of any final partial segment. # Retain the value of depth_higher; in practice this is fine start = self._leaf_start(self.length) - hashes = self.source.hashes(start, length - start) + hashes = await self.source_func(start, length - start) self.level[start >> self.depth_higher:] = self._level(hashes) self.length = length - def _level_for(self, length): + async def _level_for(self, length): '''Return a (level_length, final_hash) pair for a truncation of the hashes to the given length.''' if length == self.length: @@ -198,10 +201,16 @@ class MerkleCache(object): level = self.level[:length >> self.depth_higher] leaf_start = self._leaf_start(length) count = min(self._segment_length(), length - leaf_start) - hashes = self.source.hashes(leaf_start, count) + hashes = await self.source_func(leaf_start, count) level += self._level(hashes) return level + async def initialize(self, length): + '''Call to initialize the cache to a source of given length.''' + self.length = length + self.depth_higher = self.merkle.tree_depth(length) // 2 + self.level = self._level(await self.source_func(0, length)) + def truncate(self, length): '''Truncate the cache so it covers no more than length underlying hashes.''' @@ -215,7 +224,7 @@ class MerkleCache(object): self.length = length self.level[length >> self.depth_higher:] = [] - def branch_and_root(self, length, index): + async def branch_and_root(self, length, index): '''Return a merkle branch and root. Length is the number of hashes used to calculate the merkle root, index is the position of the hash to calculate the branch of. @@ -229,12 +238,12 @@ class MerkleCache(object): raise ValueError('length must be positive') if index >= length: raise ValueError('index must be less than length') - self._extend_to(length) + await self._extend_to(length) leaf_start = self._leaf_start(index) count = min(self._segment_length(), length - leaf_start) - leaf_hashes = self.source.hashes(leaf_start, count) + leaf_hashes = await self.source_func(leaf_start, count) if length < self._segment_length(): return self.merkle.branch_and_root(leaf_hashes, index) - level = self._level_for(length) + level = await self._level_for(length) return self.merkle.branch_and_root_from_level( level, leaf_hashes, index, self.depth_higher) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 1645bb7..08a72a6 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -139,12 +139,6 @@ class Prefetcher(object): return True -class HeaderSource(object): - - def __init__(self, db): - self.hashes = db.fs_block_hashes - - class ChainError(Exception): '''Raised on error processing blocks.''' @@ -174,7 +168,7 @@ class BlockProcessor(electrumx.server.db.DB): # Header merkle cache self.merkle = Merkle() - self.header_mc = None + self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) # Caches of unflushed items. self.headers = [] @@ -251,9 +245,7 @@ class BlockProcessor(electrumx.server.db.DB): await self.run_in_thread_shielded(self.backup_blocks, raw_blocks) last -= len(raw_blocks) # Truncate header_mc: header count is 1 more than the height. - # Note header_mc is None if the reorg happens at startup. - if self.header_mc: - self.header_mc.truncate(self.height + 1) + self.header_mc.truncate(self.height + 1) await self.prefetcher.reset_height(self.height) async def reorg_hashes(self, count): @@ -269,7 +261,7 @@ class BlockProcessor(electrumx.server.db.DB): self.logger.info(f'chain was reorganised replacing {count:,d} ' f'block{s} at heights {start:,d}-{last:,d}') - return start, last, self.fs_block_hashes(start, count) + return start, last, await self.fs_block_hashes(start, count) async def calc_reorg_range(self, count): '''Calculate the reorg range''' @@ -287,7 +279,7 @@ class BlockProcessor(electrumx.server.db.DB): start = self.height - 1 count = 1 while start > 0: - hashes = self.fs_block_hashes(start, count) + hashes = await self.fs_block_hashes(start, count) hex_hashes = [hash_to_hex_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) n = diff_pos(hex_hashes, d_hex_hashes) @@ -774,7 +766,7 @@ class BlockProcessor(electrumx.server.db.DB): await self.open_for_serving() # Populate the header merkle cache length = max(1, self.height - self.env.reorg_limit) - self.header_mc = MerkleCache(self.merkle, HeaderSource(self), length) + await self.header_mc.initialize(length) self.logger.info('populated header merkle cache') async def _first_open_dbs(self): diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index 58534db..06c2d8c 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -45,12 +45,12 @@ class ChainState(object): 'db_height': self.db_height(), } - def header_branch_and_root(self, length, height): + async def header_branch_and_root(self, length, height): return self._bp.header_mc.branch_and_root(length, height) async def raw_header(self, height): '''Return the binary header at the given height.''' - header, n = self._bp.read_headers(height, 1) + header, n = await self.read_headers(height, 1) if n != 1: raise IndexError(f'height {height:,d} out of range') return header diff --git a/electrumx/server/db.py b/electrumx/server/db.py index 3eeadb4..a6177a3 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -182,7 +182,7 @@ class DB(object): offset = prior_tx_count * 32 self.hashes_file.write(offset, hashes) - def read_headers(self, start_height, count): + async def read_headers(self, start_height, count): '''Requires start_height >= 0, count >= 0. Reads as many headers as are available starting at start_height up to count. This would be zero if start_height is beyond self.db_height, for @@ -191,16 +191,20 @@ class DB(object): Returns a (binary, n) pair where binary is the concatenated binary headers, and n is the count of headers returned. ''' - # Read some from disk if start_height < 0 or count < 0: - raise self.DBError('{:,d} headers starting at {:,d} not on disk' - .format(count, start_height)) - disk_count = max(0, min(count, self.db_height + 1 - start_height)) - if disk_count: - offset = self.header_offset(start_height) - size = self.header_offset(start_height + disk_count) - offset - return self.headers_file.read(offset, size), disk_count - return b'', 0 + raise self.DBError(f'{count:,d} headers starting at ' + f'{start_height:,d} not on disk') + + def read_headers(): + # Read some from disk + disk_count = max(0, min(count, self.db_height + 1 - start_height)) + if disk_count: + offset = self.header_offset(start_height) + size = self.header_offset(start_height + disk_count) - offset + return self.headers_file.read(offset, size), disk_count + return b'', 0 + + return await run_in_thread(read_headers) def fs_tx_hash(self, tx_num): '''Return a par (tx_hash, tx_height) for the given tx number. @@ -213,8 +217,8 @@ class DB(object): tx_hash = self.hashes_file.read(tx_num * 32, 32) return tx_hash, tx_height - def fs_block_hashes(self, height, count): - headers_concat, headers_count = self.read_headers(height, count) + async def fs_block_hashes(self, height, count): + headers_concat, headers_count = await self.read_headers(height, count) if headers_count != count: raise self.DBError('only got {:,d} headers starting at {:,d}, not ' '{:,d}'.format(headers_count, height, count)) diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 33e6cea..afa61f5 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -908,14 +908,14 @@ class ElectrumX(SessionBase): hashX = scripthash_to_hashX(scripthash) return await self.hashX_subscribe(hashX, scripthash) - def _merkle_proof(self, cp_height, height): + async def _merkle_proof(self, cp_height, height): max_height = self.db_height() if not height <= cp_height <= max_height: raise RPCError(BAD_REQUEST, f'require header height {height:,d} <= ' f'cp_height {cp_height:,d} <= ' f'chain height {max_height:,d}') - branch, root = self.chain_state.header_branch_and_root( + branch, root = await self.chain_state.header_branch_and_root( cp_height + 1, height) return { 'branch': [hash_to_hex_str(elt) for elt in branch], @@ -931,7 +931,7 @@ class ElectrumX(SessionBase): if cp_height == 0: return raw_header_hex result = {'header': raw_header_hex} - result.update(self._merkle_proof(cp_height, height)) + result.update(await self._merkle_proof(cp_height, height)) return result async def block_header_13(self, height): @@ -953,11 +953,12 @@ class ElectrumX(SessionBase): max_size = self.MAX_CHUNK_SIZE count = min(count, max_size) - headers, count = self.chain_state.read_headers(start_height, count) + headers, count = await self.chain_state.read_headers(start_height, + count) result = {'hex': headers.hex(), 'count': count, 'max': max_size} if count and cp_height: last_height = start_height + count - 1 - result.update(self._merkle_proof(cp_height, last_height)) + result.update(await self._merkle_proof(cp_height, last_height)) return result async def block_headers_12(self, start_height, count): @@ -970,7 +971,7 @@ class ElectrumX(SessionBase): index = non_negative_integer(index) size = self.coin.CHUNK_SIZE start_height = index * size - headers, count = self.chain_state.read_headers(start_height, size) + headers, _ = await self.chain_state.read_headers(start_height, size) return headers.hex() async def block_get_header(self, height): diff --git a/tests/lib/test_merkle.py b/tests/lib/test_merkle.py index dd9d9ff..35b9c65 100644 --- a/tests/lib/test_merkle.py +++ b/tests/lib/test_merkle.py @@ -149,72 +149,83 @@ class Source(object): def __init__(self, length): self._hashes = [os.urandom(32) for _ in range(length)] - def hashes(self, start, count): + async def hashes(self, start, count): assert start >= 0 assert start + count <= len(self._hashes) return self._hashes[start: start + count] -def test_merkle_cache(): +@pytest.mark.asyncio +async def test_merkle_cache(): lengths = (*range(1, 18), 31, 32, 33, 57) - source = Source(max(lengths)) + source = Source(max(lengths)).hashes for length in lengths: - cache = MerkleCache(merkle, source, length) + cache = MerkleCache(merkle, source) + await cache.initialize(length) # Simulate all possible checkpoints for cp_length in range(1, length + 1): - cp_hashes = source.hashes(0, cp_length) + cp_hashes = await source(0, cp_length) # All possible indices for index in range(cp_length): # Compare correct answer with cache branch, root = merkle.branch_and_root(cp_hashes, index) - branch2, root2 = cache.branch_and_root(cp_length, index) + branch2, root2 = await cache.branch_and_root(cp_length, index) assert branch == branch2 assert root == root2 -def test_merkle_cache_extension(): - source = Source(64) +@pytest.mark.asyncio +async def test_merkle_cache_extension(): + source = Source(64).hashes for length in range(14, 18): for cp_length in range(30, 36): - cache = MerkleCache(merkle, source, length) - cp_hashes = source.hashes(0, cp_length) + cache = MerkleCache(merkle, source) + await cache.initialize(length) + cp_hashes = await source(0, cp_length) # All possible indices for index in range(cp_length): # Compare correct answer with cache branch, root = merkle.branch_and_root(cp_hashes, index) - branch2, root2 = cache.branch_and_root(cp_length, index) + branch2, root2 = await cache.branch_and_root(cp_length, index) assert branch == branch2 assert root == root2 -def test_merkle_cache_truncation(): +@pytest.mark.asyncio +async def test_merkle_cache_truncation(): max_length = 33 - source = Source(max_length) + source = Source(max_length).hashes for length in range(max_length - 2, max_length + 1): for trunc_length in range(1, 20, 3): - cache = MerkleCache(merkle, source, length) + cache = MerkleCache(merkle, source) + await cache.initialize(length) cache.truncate(trunc_length) assert cache.length <= trunc_length for cp_length in range(1, length + 1, 3): - cp_hashes = source.hashes(0, cp_length) + cp_hashes = await source(0, cp_length) # All possible indices for index in range(cp_length): # Compare correct answer with cache branch, root = merkle.branch_and_root(cp_hashes, index) - branch2, root2 = cache.branch_and_root(cp_length, index) + branch2, root2 = await cache.branch_and_root(cp_length, + index) assert branch == branch2 assert root == root2 # Truncation is a no-op if longer - cache = MerkleCache(merkle, source, 10) + cache = MerkleCache(merkle, source) + await cache.initialize(10) level = cache.level.copy() for length in range(10, 13): cache.truncate(length) assert cache.level == level assert cache.length == 10 -def test_truncation_bad(): - cache = MerkleCache(merkle, Source(10), 10) + +@pytest.mark.asyncio +async def test_truncation_bad(): + cache = MerkleCache(merkle, Source(10).hashes) + await cache.initialize(10) with pytest.raises(TypeError): cache.truncate(1.0) for n in (-1, 0): @@ -222,43 +233,48 @@ def test_truncation_bad(): cache.truncate(n) -def test_markle_cache_bad(): +@pytest.mark.asyncio +async def test_markle_cache_bad(): length = 23 - source = Source(length) - cache = MerkleCache(merkle, source, length) - cache.branch_and_root(5, 3) + source = Source(length).hashes + cache = MerkleCache(merkle, source) + await cache.initialize(length) + await cache.branch_and_root(5, 3) with pytest.raises(TypeError): - cache.branch_and_root(5.0, 3) + await cache.branch_and_root(5.0, 3) with pytest.raises(TypeError): - cache.branch_and_root(5, 3.0) + await cache.branch_and_root(5, 3.0) with pytest.raises(ValueError): - cache.branch_and_root(0, -1) + await cache.branch_and_root(0, -1) with pytest.raises(ValueError): - cache.branch_and_root(3, 3) + await cache.branch_and_root(3, 3) -def test_bad_extension(): +@pytest.mark.asyncio +async def test_bad_extension(): length = 5 - source = Source(length) - cache = MerkleCache(merkle, source, length) + source = Source(length).hashes + cache = MerkleCache(merkle, source) + await cache.initialize(length) level = cache.level.copy() with pytest.raises(AssertionError): - cache.branch_and_root(8, 0) + await cache.branch_and_root(8, 0) # The bad extension should not destroy the cache assert cache.level == level assert cache.length == length -def time_it(): - source = Source(500000) +async def time_it(): + source = Source(500000).hashes + cp_length = 492000 import time cache = MerkleCache(merkle, source) - cp_length = 492000 - cp_hashes = source.hashes(0, cp_length) + await cache.initialize(cp_length) + cp_hashes = await source(0, cp_length) brs2 = [] t1 = time.time() for index in range(5, 400000, 500): - brs2.append(cache.branch_and_root(cp_length, index)) + brs2.append(await cache.branch_and_root(cp_length, index)) t2 = time.time() print(t2 - t1) assert False From 955a8e927d6c5cfa447d9ddcf2c660a9eae9c190 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 6 Aug 2018 22:23:41 +0900 Subject: [PATCH 07/38] Put flushing-to-DB in a thread - flush() and backup_flush() are now async --- electrumx/server/block_processor.py | 89 ++++++++++++++++------------- 1 file changed, 50 insertions(+), 39 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 08a72a6..18b697a 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -56,7 +56,7 @@ class Prefetcher(object): if not await self._prefetch_blocks(): await asyncio.sleep(5) except DaemonError as e: - self.logger.info('ignoring daemon error: {}'.format(e)) + self.logger.info(f'ignoring daemon error: {e}') def get_prefetched_blocks(self): '''Called by block processor when it is processing queued blocks.''' @@ -183,10 +183,26 @@ class BlockProcessor(electrumx.server.db.DB): # is consistent with self.height self.state_lock = asyncio.Lock() - async def run_in_thread_shielded(self, func, *args): + async def run_in_thread_with_lock(self, func, *args): + # Run in a thread to prevent blocking. Shielded so that + # cancellations from shutdown don't lose work - when the task + # completes the data will be flushed and then we shut down. + # Take the state lock to be certain in-memory state is + # consistent and not being updated elsewhere. async with self.state_lock: return await asyncio.shield(run_in_thread(func, *args)) + async def _maybe_flush(self): + # If caught up, flush everything as client queries are + # performed on the DB. + if self._caught_up_event.is_set(): + await self.flush(True) + elif time.time() > self.next_cache_check: + flush_arg = self.check_cache_size() + if flush_arg is not None: + await self.flush(flush_arg) + self.next_cache_check = time.time() + 30 + async def check_and_advance_blocks(self, raw_blocks): '''Process the list of raw blocks passed. Detects and handles reorgs. @@ -201,7 +217,14 @@ class BlockProcessor(electrumx.server.db.DB): chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]] if hprevs == chain: - await self.run_in_thread_shielded(self.advance_blocks, blocks) + start = time.time() + await self.run_in_thread_with_lock(self.advance_blocks, blocks) + await self._maybe_flush() + if not self.first_sync: + s = '' if len(blocks) == 1 else 's' + self.logger.info('processed {:,d} block{} in {:.1f}s' + .format(len(blocks), s, + time.time() - start)) if self._caught_up_event.is_set(): await self.notifications.on_block(self.touched, self.height) self.touched = set() @@ -226,7 +249,7 @@ class BlockProcessor(electrumx.server.db.DB): self.logger.info('chain reorg detected') else: self.logger.info(f'faking a reorg of {count:,d} blocks') - await run_in_thread(self.flush, True) + await self.flush(True) async def get_raw_blocks(last_height, hex_hashes): heights = range(last_height, last_height - len(hex_hashes), -1) @@ -242,7 +265,8 @@ class BlockProcessor(electrumx.server.db.DB): hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] for hex_hashes in chunks(hashes, 50): raw_blocks = await get_raw_blocks(last, hex_hashes) - await self.run_in_thread_shielded(self.backup_blocks, raw_blocks) + await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) + await self.backup_flush() last -= len(raw_blocks) # Truncate header_mc: header count is 1 more than the height. self.header_mc.truncate(self.height + 1) @@ -312,14 +336,16 @@ class BlockProcessor(electrumx.server.db.DB): assert not self.db_deletes self.history.assert_flushed() - def flush(self, flush_utxos=False): + async def flush(self, flush_utxos): + if self.height == self.db_height: + self.assert_flushed() + else: + await self.run_in_thread_with_lock(self._flush_body, flush_utxos) + + def _flush_body(self, flush_utxos): '''Flush out cached state. History is always flushed. UTXOs are flushed if flush_utxos.''' - if self.height == self.db_height: - self.assert_flushed() - return - flush_start = time.time() last_flush = self.last_flush tx_diff = self.tx_count - self.last_flush_tx_count @@ -387,23 +413,25 @@ class BlockProcessor(electrumx.server.db.DB): self.tx_hashes = [] self.headers = [] - def backup_flush(self): + async def backup_flush(self): + assert self.height < self.db_height + assert not self.headers + assert not self.tx_hashes + self.history.assert_flushed() + await self.run_in_thread_with_lock(self._backup_flush_body) + + def _backup_flush_body(self): '''Like flush() but when backing up. All UTXOs are flushed. hashXs - sequence of hashXs which were touched by backing up. Searched for history entries to remove after the backup height. ''' - assert self.height < self.db_height - self.history.assert_flushed() - flush_start = time.time() # Backup FS (just move the pointers back) self.fs_height = self.height self.fs_tx_count = self.tx_count - assert not self.headers - assert not self.tx_hashes # Backup history. self.touched can include other addresses # which is harmless, but remove None. @@ -445,14 +473,14 @@ class BlockProcessor(electrumx.server.db.DB): # Flush history if it takes up over 20% of cache memory. # Flush UTXOs once they take up 80% of cache memory. if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5: - self.flush(utxo_MB >= self.cache_MB * 4 // 5) + return utxo_MB >= self.cache_MB * 4 // 5 + return None def advance_blocks(self, blocks): '''Synchronously advance the blocks. It is already verified they correctly connect onto our tip. ''' - start = time.time() min_height = self.min_undo_height(self.daemon.cached_height()) height = self.height @@ -468,21 +496,6 @@ class BlockProcessor(electrumx.server.db.DB): self.headers.extend(headers) self.tip = self.coin.header_hash(headers[-1]) - # If caught up, flush everything as client queries are - # performed on the DB. - if self._caught_up_event.is_set(): - self.flush(True) - else: - if time.time() > self.next_cache_check: - self.check_cache_size() - self.next_cache_check = time.time() + 30 - - if not self.first_sync: - s = '' if len(blocks) == 1 else 's' - self.logger.info('processed {:,d} block{} in {:.1f}s' - .format(len(blocks), s, - time.time() - start)) - def advance_txs(self, txs): self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) @@ -555,7 +568,6 @@ class BlockProcessor(electrumx.server.db.DB): self.tx_counts.pop() self.logger.info('backed up to height {:,d}'.format(self.height)) - self.backup_flush() def backup_txs(self, txs): # Prevout values, in order down the block (coinbase first if present) @@ -756,7 +768,7 @@ class BlockProcessor(electrumx.server.db.DB): # Flush everything but with first_sync->False state. first_sync = self.first_sync self.first_sync = False - self.flush(True) + await self.flush(True) if first_sync: self.logger.info(f'{electrumx.version} synced to ' f'height {self.height:,d}') @@ -808,10 +820,9 @@ class BlockProcessor(electrumx.server.db.DB): await group.spawn(self.prefetcher.main_loop(self.height)) await group.spawn(self._process_prefetched_blocks()) finally: - async with self.state_lock: - # Shut down block processing - self.logger.info('flushing to DB for a clean shutdown...') - self.flush(True) + # Shut down block processing + self.logger.info('flushing to DB for a clean shutdown...') + await self.flush(True) def force_chain_reorg(self, count): '''Force a reorg of the given number of blocks. From 1badab2186bc956eff3ab8de0b8ecbb178a64b65 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 7 Aug 2018 09:57:35 +0900 Subject: [PATCH 08/38] Add documentation of proposed protocol 1.5 changes --- docs/protocol-changes.rst | 60 +++++++ docs/protocol-methods.rst | 344 +++++++++++++++++++++----------------- docs/protocol-removed.rst | 161 ++++++++++++++++++ 3 files changed, 416 insertions(+), 149 deletions(-) diff --git a/docs/protocol-changes.rst b/docs/protocol-changes.rst index fc0a4fe..7c0bcdd 100644 --- a/docs/protocol-changes.rst +++ b/docs/protocol-changes.rst @@ -146,3 +146,63 @@ Removed methods * :func:`blockchain.block.get_header` * :func:`blockchain.block.get_chunk` + +Version 1.5 +=========== + +This protocol version makes changes intended to allow clients and +servers to more easily scale to support queries about busy addresses. +It has changes to reduce the amount of round-trip queries made in +common usage, and to make results more compact to reduce bandwidth +consumption. + +RPC calls with potentially large responses have pagination support, +and the return value of :func:`blockchain.scripthash.subscribe` +changes. Script hash :ref:`status ` had to be recalculated +with each new transaction and was undefined if it included more than +one mempool transaction. Its calculation is linear in history length +resulting in quadratic complexity as history grows. Its calculation +for large histories was demanding for both the server to compute and +the client to check. + +RPC calls and notifications that combined the effects of the mempool +and confirmed history are removed. + +The changes are beneficial to clients and servers alike, but will +require changes to both client-side and server-side logic. In +particular, the client should track what block (by hash and height) +wallet data is synchronized to, and if that hash is no longer part of +the main chain, it will need to remove wallet data for blocks that +were reorganized away and get updated information as of the first +reorganized block. The effects are limited to script hashes +potentially affected by the reorg, and for most clients this will be +the empty set. + +New methods +----------- + + * :func:`blockchain.scripthash.history` + * :func:`blockchain.scripthash.utxos` + +New notifications +----------------- + + * :func:`mempool.changes` + +Changes +------- + + * :func:`blockchain.scripthash.subscribe` has changed its return value + and the notifications it sends + * :func:`blockchain.transaction.get` takes an additional optional + argument *merkle* + +Removed methods +--------------- + + * :func:`blockchain.scripthash.get_history`. Switch to + :func:`blockchain.scripthash.history` + * :func:`blockchain.scripthash.get_mempool`. Switch to + handling :func:`mempool.changes` notifications + * :func:`blockchain.scripthash.listunspent`. Switch to + :func:`blockchain.scripthash.utxos` diff --git a/docs/protocol-methods.rst b/docs/protocol-methods.rst index ee10416..bbfda31 100644 --- a/docs/protocol-methods.rst +++ b/docs/protocol-methods.rst @@ -310,166 +310,68 @@ Return the confirmed and unconfirmed balances of a :ref:`script hash "unconfirmed": "0.236844" } -blockchain.scripthash.get_history -================================= +blockchain.scripthash.history +============================= -Return the confirmed and unconfirmed history of a :ref:`script hash -