From 12dbf2c74a3429c3c30b4cd89fe2775a9bf0d84a Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 6 Aug 2018 20:46:09 +0900 Subject: [PATCH] Rename get_history to limited_history - make it async and run in a thread --- contrib/query.py | 4 ++-- electrumx/server/chain_state.py | 19 +++---------------- electrumx/server/db.py | 20 ++++++++++++-------- electrumx/server/session.py | 14 ++++++++++---- 4 files changed, 27 insertions(+), 30 deletions(-) diff --git a/contrib/query.py b/contrib/query.py index 1a85dd7..60f1696 100755 --- a/contrib/query.py +++ b/contrib/query.py @@ -73,8 +73,8 @@ async def query(args): if not hashX: continue n = None - for n, (tx_hash, height) in enumerate(db.get_history(hashX, limit), - start=1): + history = await db.limited_history(hashX, limit=limit) + for n, (tx_hash, height) in enumerate(history, start=1): print(f'History #{n:,d}: height {height:,d} ' f'tx_hash {hash_to_hex_str(tx_hash)}') if n is None: diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index b950bf4..d9d6fb2 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -8,8 +8,6 @@ import asyncio -from aiorpcx import run_in_thread - from electrumx.lib.hash import hash_to_hex_str @@ -28,6 +26,7 @@ class ChainState(object): self.tx_branch_and_root = self._bp.merkle.branch_and_root self.read_headers = self._bp.read_headers self.all_utxos = self._bp.all_utxos + self.limited_history = self._bp.limited_history async def broadcast_transaction(self, raw_tx): return await self._daemon.sendrawtransaction([raw_tx]) @@ -46,18 +45,6 @@ class ChainState(object): 'db_height': self.db_height(), } - async def get_history(self, hashX): - '''Get history asynchronously to reduce latency.''' - def job(): - # History DoS limit. Each element of history is about 99 - # bytes when encoded as JSON. This limits resource usage - # on bloated history requests, and uses a smaller divisor - # so large requests are logged before refusing them. - limit = self._env.max_send // 97 - return list(self._bp.get_history(hashX, limit=limit)) - - return await run_in_thread(job) - def header_branch_and_root(self, length, height): return self._bp.header_mc.branch_and_root(length, height) @@ -102,8 +89,8 @@ class ChainState(object): if not hashX: continue n = None - for n, (tx_hash, height) in enumerate( - db.get_history(hashX, limit), start=1): + history = await db.limited_history(hashX, limit=limit) + for n, (tx_hash, height) in enumerate(history): lines.append(f'History #{n:,d}: height {height:,d} ' f'tx_hash {hash_to_hex_str(tx_hash)}') if n is None: diff --git a/electrumx/server/db.py b/electrumx/server/db.py index c288ff3..3eeadb4 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -227,15 +227,19 @@ class DB(object): return [self.coin.header_hash(header) for header in headers] - def get_history(self, hashX, limit=1000): - '''Generator that returns an unpruned, sorted list of (tx_hash, - height) tuples of confirmed transactions that touched the address, - earliest in the blockchain first. Includes both spending and - receiving transactions. By default yields at most 1000 entries. - Set limit to None to get them all. + async def limited_history(self, hashX, *, limit=1000): + '''Return an unpruned, sorted list of (tx_hash, height) tuples of + confirmed transactions that touched the address, earliest in + the blockchain first. Includes both spending and receiving + transactions. By default returns at most 1000 entries. Set + limit to None to get them all. ''' - for tx_num in self.history.get_txnums(hashX, limit): - yield self.fs_tx_hash(tx_num) + def read_history(): + tx_nums = list(self.history.get_txnums(hashX, limit)) + fs_tx_hash = self.fs_tx_hash + return [fs_tx_hash(tx_num) for tx_num in tx_nums] + + return await run_in_thread(read_history) # -- Undo information diff --git a/electrumx/server/session.py b/electrumx/server/session.py index d5d89eb..e450108 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -444,11 +444,17 @@ class SessionManager(object): '''The number of connections that we've sent something to.''' return len(self.sessions) - async def get_history(self, hashX): + async def limited_history(self, hashX): '''A caching layer.''' hc = self._history_cache if hashX not in hc: - hc[hashX] = await self.chain_state.get_history(hashX) + # History DoS limit. Each element of history is about 99 + # bytes when encoded as JSON. This limits resource usage + # on bloated history requests, and uses a smaller divisor + # so large requests are logged before refusing them. + limit = self.env.max_send // 97 + hc[hashX] = await self.chain_state.limited_history(hashX, + limit=limit) return hc[hashX] async def _notify_sessions(self, height, touched): @@ -773,7 +779,7 @@ class ElectrumX(SessionBase): ''' # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 - history = await self.session_mgr.get_history(hashX) + history = await self.session_mgr.limited_history(hashX) mempool = await self.mempool.transaction_summaries(hashX) status = ''.join('{}:{:d}:'.format(hash_to_hex_str(tx_hash), height) @@ -873,7 +879,7 @@ class ElectrumX(SessionBase): async def confirmed_and_unconfirmed_history(self, hashX): # Note history is ordered but unconfirmed is unordered in e-s - history = await self.session_mgr.get_history(hashX) + history = await self.session_mgr.limited_history(hashX) conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height} for tx_hash, height in history] return conf + await self.unconfirmed_history(hashX)