From 1ef6a4d7852020762ba3aaa26cf034d0ad9c8aca Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 11 Dec 2016 10:32:51 +0900 Subject: [PATCH] Split mempool out into new file. Rework mempool handling --- lib/util.py | 10 ++ server/block_processor.py | 13 +- server/mempool.py | 287 ++++++++++++++++++++++++++++++++++++++ server/protocol.py | 267 +++++++---------------------------- 4 files changed, 354 insertions(+), 223 deletions(-) create mode 100644 server/mempool.py diff --git a/lib/util.py b/lib/util.py index de0d2f7..7cbae72 100644 --- a/lib/util.py +++ b/lib/util.py @@ -9,6 +9,7 @@ import array +import asyncio import inspect import logging import sys @@ -132,3 +133,12 @@ def increment_byte_string(bs): # This can only happen if all characters are 0xff bs = bytes([1]) + bs return bytes(bs) + +async def asyncio_clean_shutdown(loop=None): + while True: + pending_tasks = [task for task in asyncio.Task.all_tasks(loop) + if not task.done()] + if len(pending_tasks) <= 1: + break + await asyncio.sleep(0) + await asyncio.sleep(1) diff --git a/server/block_processor.py b/server/block_processor.py index b4f749c..50d3d7c 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -132,11 +132,9 @@ class BlockProcessor(server.db.DB): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, client, env): + def __init__(self, env, touched, touched_event): super().__init__(env) - self.client = client - # The block processor reads its tasks from this queue self.tasks = asyncio.Queue() @@ -151,6 +149,8 @@ class BlockProcessor(server.db.DB): self.caught_up = False self._shutdown = False self.event = asyncio.Event() + self.touched = touched + self.touched_event = touched_event # Meta self.utxo_MB = env.utxo_MB @@ -218,9 +218,8 @@ class BlockProcessor(server.db.DB): for block in blocks: if self._shutdown: break - self.advance_block(block, touched) + self.advance_block(block, self.touched) - touched = set() loop = asyncio.get_event_loop() try: if self.caught_up: @@ -228,14 +227,14 @@ class BlockProcessor(server.db.DB): else: do_it() except ChainReorg: - await self.handle_chain_reorg(touched) + await self.handle_chain_reorg(self.touched) if self.caught_up: # Flush everything as queries are performed on the DB and # not in-memory. await asyncio.sleep(0) self.flush(True) - self.client.notify(touched) + self.touched_event.set() elif time.time() > self.next_cache_check: self.check_cache_size() self.next_cache_check = time.time() + 60 diff --git a/server/mempool.py b/server/mempool.py new file mode 100644 index 0000000..325ea7b --- /dev/null +++ b/server/mempool.py @@ -0,0 +1,287 @@ +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + +'''Mempool handling.''' + +import asyncio +import itertools +import time +from collections import defaultdict +from functools import partial + +from lib.hash import hash_to_str, hex_str_to_hash +from lib.tx import Deserializer +import lib.util as util +from server.daemon import DaemonError + + +class MemPool(util.LoggedClass): + '''Representation of the daemon's mempool. + + Updated regularly in caught-up state. Goal is to enable efficient + response to the value() and transactions() calls. + + To that end we maintain the following maps: + + tx_hash -> (txin_pairs, txout_pairs) + hash168 -> set of all tx hashes in which the hash168 appears + + A pair is a (hash168, value) tuple. tx hashes are hex strings. + ''' + + def __init__(self, daemon, coin, db, touched, touched_event): + super().__init__() + self.daemon = daemon + self.coin = coin + self.db = db + self.touched = touched + self.touched_event = touched_event + self.stop = False + self.txs = {} + self.hash168s = defaultdict(set) # None can be a key + + async def main_loop(self, caught_up): + '''Asynchronously maintain mempool status with daemon. + + Waits until the caught up event is signalled.''' + await caught_up.wait() + self.logger.info('beginning processing of daemon mempool. ' + 'This can take some time...') + try: + await self.fetch_and_process() + except asyncio.CancelledError: + # This aids clean shutdowns + self.stop = True + + async def fetch_and_process(self): + '''The inner loop unprotected by try / except.''' + unfetched = set() + unprocessed = {} + log_every = 150 + log_secs = 0 + fetch_size = 400 + process_some = self.async_process_some(unfetched, fetch_size // 2) + next_refresh = 0 + # The list of mempool hashes is fetched no more frequently + # than this number of seconds + refresh_secs = 5 + + while True: + try: + now = time.time() + if now >= next_refresh: + await self.new_hashes(unprocessed, unfetched) + next_refresh = now + refresh_secs + log_secs -= refresh_secs + + # Fetch some txs if unfetched ones remain + if unfetched: + count = min(len(unfetched), fetch_size) + hex_hashes = [unfetched.pop() for n in range(count)] + unprocessed.update(await self.fetch_raw_txs(hex_hashes)) + + # Process some txs if unprocessed ones remain + if unprocessed: + await process_some(unprocessed) + + if self.touched: + self.touched_event.set() + + if log_secs <= 0 and not unprocessed: + log_secs = log_every + self.logger.info('{:,d} txs touching {:,d} addresses' + .format(len(self.txs), + len(self.hash168s))) + await asyncio.sleep(1) + except DaemonError as e: + self.logger.info('ignoring daemon error: {}'.format(e)) + + async def new_hashes(self, unprocessed, unfetched): + '''Get the current list of hashes in the daemon's mempool. + + Remove ones that have disappeared from self.txs and unprocessed. + ''' + txs = self.txs + hash168s = self.hash168s + touched = self.touched + + hashes = set(await self.daemon.mempool_hashes()) + new = hashes.difference(txs) + gone = set(txs).difference(hashes) + for hex_hash in gone: + unprocessed.pop(hex_hash, None) + item = txs.pop(hex_hash) + if item: + txin_pairs, txout_pairs = item + tx_hash168s = set(hash168 for hash168, value in txin_pairs) + tx_hash168s.update(hash168 for hash168, value in txout_pairs) + for hash168 in tx_hash168s: + hash168s[hash168].remove(hex_hash) + if not hash168s[hash168]: + del hash168s[hash168] + touched.update(tx_hash168s) + + unfetched.update(new) + for hex_hash in new: + txs[hex_hash] = None + + def async_process_some(self, unfetched, limit): + loop = asyncio.get_event_loop() + pending = [] + txs = self.txs + + async def process(unprocessed): + nonlocal pending + + raw_txs = {} + while unprocessed and len(raw_txs) < limit: + hex_hash, raw_tx = unprocessed.popitem() + raw_txs[hex_hash] = raw_tx + + if unprocessed: + deferred = [] + else: + deferred = pending + pending = [] + + process_raw_txs = partial(self.process_raw_txs, raw_txs, deferred) + result, deferred = ( + await loop.run_in_executor(None, process_raw_txs)) + + pending.extend(deferred) + hash168s = self.hash168s + touched = self.touched + for hex_hash, in_out_pairs in result.items(): + if hex_hash in txs: + txs[hex_hash] = in_out_pairs + for hash168, value in itertools.chain(*in_out_pairs): + touched.add(hash168) + hash168s[hash168].add(hex_hash) + + to_do = len(unfetched) + len(unprocessed) + if to_do: + percent = (len(txs) - to_do) * 100 // len(txs) + self.logger.info('catchup {:d}% complete'.format(percent)) + + return process + + async def fetch_raw_txs(self, hex_hashes): + '''Fetch a list of mempool transactions.''' + raw_txs = await self.daemon.getrawtransactions(hex_hashes) + + # Skip hashes the daemon has dropped. Either they were + # evicted or they got in a block. + return {hh:raw for hh, raw in zip(hex_hashes, raw_txs) if raw} + + def process_raw_txs(self, raw_tx_map, pending): + '''Process the dictionary of raw transactions and return a dictionary + of updates to apply to self.txs. + + This runs in the executor so should not update any member + variables it doesn't own. Atomic reads of self.txs that do + not depend on the result remaining the same are fine. + ''' + script_hash168 = self.coin.hash168_from_script() + db_utxo_lookup = self.db.db_utxo_lookup + txs = self.txs + + # Deserialize each tx and put it in our priority queue + for tx_hash, raw_tx in raw_tx_map.items(): + if not tx_hash in txs: + continue + tx = Deserializer(raw_tx).read_tx() + + # Convert the tx outputs into (hash168, value) pairs + txout_pairs = [(script_hash168(txout.pk_script), txout.value) + for txout in tx.outputs] + + # Convert the tx inputs to ([prev_hex_hash, prev_idx) pairs + txin_pairs = [(hash_to_str(txin.prev_hash), txin.prev_idx) + for txin in tx.inputs] + + pending.append((tx_hash, txin_pairs, txout_pairs)) + + # Now process what we can + result = {} + deferred = [] + + for item in pending: + if self.stop: + break + + tx_hash, old_txin_pairs, txout_pairs = item + if tx_hash not in txs: + continue + + mempool_missing = False + txin_pairs = [] + + try: + for prev_hex_hash, prev_idx in old_txin_pairs: + tx_info = txs.get(prev_hex_hash, 0) + if tx_info is None: + tx_info = result.get(prev_hex_hash) + if not tx_info: + mempool_missing = True + continue + if tx_info: + txin_pairs.append(tx_info[1][prev_idx]) + elif not mempool_missing: + prev_hash = hex_str_to_hash(prev_hex_hash) + txin_pairs.append(db_utxo_lookup(prev_hash, prev_idx)) + except self.db.MissingUTXOError: + # This typically happens just after the daemon has + # accepted a new block and the new mempool has deps on + # new txs in that block. + continue + + if mempool_missing: + deferred.append(item) + else: + result[tx_hash] = (txin_pairs, txout_pairs) + + return result, deferred + + async def transactions(self, hash168): + '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool + entries for the hash168. + + unconfirmed is True if any txin is unconfirmed. + ''' + # hash168s is a defaultdict + if not hash168 in self.hash168s: + return [] + + hex_hashes = self.hash168s[hash168] + raw_txs = self.bp.daemon.getrawtransactions(hex_hashes) + result = [] + for hex_hash, raw_tx in zip(hex_hashes, raw_txs): + item = self.txs.get(hex_hash) + if not item or not raw_tx: + continue + tx = Deserializer(raw_tx).read_tx() + txin_pairs, txout_pairs = item + tx_fee = (sum(v for hash168, v in txin_pairs) + - sum(v for hash168, v in txout_pairs)) + unconfirmed = any(txin.prev_hash not in self.txs + for txin in tx.inputs) + result.append((hex_hash, tx_fee, unconfirmed)) + return result + + def value(self, hash168): + '''Return the unconfirmed amount in the mempool for hash168. + + Can be positive or negative. + ''' + value = 0 + # hash168s is a defaultdict + if hash168 in self.hash168s: + for hex_hash in self.hash168s[hash168]: + txin_pairs, txout_pairs = self.txs[hex_hash] + value -= sum(v for h168, v in txin_pairs if h168 == hash168) + value += sum(v for h168, v in txout_pairs if h168 == hash168) + return value diff --git a/server/protocol.py b/server/protocol.py index 961cf42..6ac272c 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -22,192 +22,14 @@ import pylru from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.jsonrpc import JSONRPC, RequestBase -from lib.tx import Deserializer import lib.util as util from server.block_processor import BlockProcessor from server.daemon import DaemonError from server.irc import IRC +from server.mempool import MemPool from server.version import VERSION -class MemPool(util.LoggedClass): - '''Representation of the daemon's mempool. - - Updated regularly in caught-up state. Goal is to enable efficient - response to the value() and transactions() calls. - - To that end we maintain the following maps: - - tx_hash -> [txin_pairs, txout_pairs, unconfirmed] - hash168 -> set of all tx hashes in which the hash168 appears - - A pair is a (hash168, value) tuple. Unconfirmed is true if any of the - tx's txins are unconfirmed. tx hashes are hex strings. - ''' - - def __init__(self, daemon, coin, db, manager): - super().__init__() - self.daemon = daemon - self.coin = coin - self.db = db - self.manager = manager - self.txs = {} - self.hash168s = defaultdict(set) # None can be a key - self.count = -1 - - async def main_loop(self, caught_up): - '''Asynchronously maintain mempool status with daemon. - - Waits until the caught up event is signalled.''' - await caught_up.wait() - self.logger.info('maintaining state with daemon...') - while True: - try: - await self.update() - await asyncio.sleep(5) - except DaemonError as e: - self.logger.info('ignoring daemon error: {}'.format(e)) - - async def update(self): - '''Update state given the current mempool to the passed set of hashes. - - Remove transactions that are no longer in our mempool. - Request new transactions we don't have then add to our mempool. - ''' - hex_hashes = set(await self.daemon.mempool_hashes()) - touched = set() - missing_utxos = [] - - initial = self.count < 0 - if initial: - self.logger.info('beginning import of {:,d} mempool txs' - .format(len(hex_hashes))) - - # Remove gone items - gone = set(self.txs).difference(hex_hashes) - for hex_hash in gone: - txin_pairs, txout_pairs, unconfirmed = self.txs.pop(hex_hash) - hash168s = set(hash168 for hash168, value in txin_pairs) - hash168s.update(hash168 for hash168, value in txout_pairs) - for hash168 in hash168s: - self.hash168s[hash168].remove(hex_hash) - if not self.hash168s[hash168]: - del self.hash168s[hash168] - touched.update(hash168s) - - # Get the raw transactions for the new hashes. Ignore the - # ones the daemon no longer has (it will return None). Put - # them into a dictionary of hex hash to deserialized tx. - hex_hashes.difference_update(self.txs) - raw_txs = await self.daemon.getrawtransactions(hex_hashes) - if initial: - self.logger.info('analysing {:,d} mempool txs' - .format(len(raw_txs))) - new_txs = {hex_hash: Deserializer(raw_tx).read_tx() - for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx} - del raw_txs, hex_hashes - - # The mempool is unordered, so process all outputs first so - # that looking for inputs has full info. - script_hash168 = self.coin.hash168_from_script() - db_utxo_lookup = self.db.db_utxo_lookup - - def txout_pair(txout): - return (script_hash168(txout.pk_script), txout.value) - - for n, (hex_hash, tx) in enumerate(new_txs.items()): - # Yield to process e.g. signals - if n % 20 == 0: - await asyncio.sleep(0) - txout_pairs = [txout_pair(txout) for txout in tx.outputs] - self.txs[hex_hash] = (None, txout_pairs, None) - - def txin_info(txin): - hex_hash = hash_to_str(txin.prev_hash) - mempool_entry = self.txs.get(hex_hash) - if mempool_entry: - return mempool_entry[1][txin.prev_idx], True - pair = db_utxo_lookup(txin.prev_hash, txin.prev_idx) - return pair, False - - if initial: - next_log = time.time() - self.logger.info('processed outputs, now examining inputs. ' - 'This can take some time...') - - # Now add the inputs - for n, (hex_hash, tx) in enumerate(new_txs.items()): - # Yield to process e.g. signals - await asyncio.sleep(0) - - if initial and time.time() > next_log: - next_log = time.time() + 20 - self.logger.info('{:,d} done ({:d}%)' - .format(n, int(n / len(new_txs) * 100))) - - txout_pairs = self.txs[hex_hash][1] - try: - infos = (txin_info(txin) for txin in tx.inputs) - txin_pairs, unconfs = zip(*infos) - except self.db.MissingUTXOError: - # Drop this TX. If other mempool txs depend on it - # it's harmless - next time the mempool is refreshed - # they'll either be cleaned up or the UTXOs will no - # longer be missing. - del self.txs[hex_hash] - continue - self.txs[hex_hash] = (txin_pairs, txout_pairs, any(unconfs)) - - # Update touched and self.hash168s for the new tx - for hash168, value in txin_pairs: - self.hash168s[hash168].add(hex_hash) - touched.add(hash168) - for hash168, value in txout_pairs: - self.hash168s[hash168].add(hex_hash) - touched.add(hash168) - - if missing_utxos: - self.logger.info('{:,d} txs had missing UTXOs; probably the ' - 'daemon is a block or two ahead of us.' - .format(len(missing_utxos))) - first = ', '.join('{} / {:,d}'.format(hash_to_str(txin.prev_hash), - txin.prev_idx) - for txin in sorted(missing_utxos)[:3]) - self.logger.info('first ones are {}'.format(first)) - - self.count += 1 - if self.count % 25 == 0 or gone: - self.count = 0 - self.logger.info('{:,d} txs touching {:,d} addresses' - .format(len(self.txs), len(self.hash168s))) - - self.manager.notify(touched) - - def transactions(self, hash168): - '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool - entries for the hash168. - - unconfirmed is True if any txin is unconfirmed. - ''' - for hex_hash in self.hash168s[hash168]: - txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash] - tx_fee = (sum(v for hash168, v in txin_pairs) - - sum(v for hash168, v in txout_pairs)) - yield (hex_hash, tx_fee, unconfirmed) - - def value(self, hash168): - '''Return the unconfirmed amount in the mempool for hash168. - - Can be positive or negative. - ''' - value = 0 - for hex_hash in self.hash168s[hash168]: - txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash] - value -= sum(v for h168, v in txin_pairs if h168 == hash168) - value += sum(v for h168, v in txout_pairs if h168 == hash168) - return value - - class ServerManager(util.LoggedClass): '''Manages the client servers, a mempool, and a block processor. @@ -229,9 +51,13 @@ class ServerManager(util.LoggedClass): def __init__(self, env): super().__init__() + self.loop = asyncio.get_event_loop() + self.touched = set() + self.touched_event = asyncio.Event() self.start = time.time() - self.bp = BlockProcessor(self, env) - self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, self) + self.bp = BlockProcessor(env, self.touched, self.touched_event) + self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, + self.touched, self.touched_event) self.irc = IRC(env) self.env = env self.servers = [] @@ -261,13 +87,13 @@ class ServerManager(util.LoggedClass): self.logger.info('max subscriptions per session: {:,d}' .format(env.max_session_subs)) - def mempool_transactions(self, hash168): + async def mempool_transactions(self, hash168): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool entries for the hash168. unconfirmed is True if any txin is unconfirmed. ''' - return self.mempool.transactions(hash168) + return await self.mempool.transactions(hash168) def mempool_value(self, hash168): '''Return the unconfirmed amount in the mempool for hash168. @@ -343,10 +169,11 @@ class ServerManager(util.LoggedClass): # shutdown() assumes bp.main_loop() is first add_future(self.bp.main_loop()) add_future(self.bp.prefetcher.main_loop()) - add_future(self.mempool.main_loop(self.bp.event)) add_future(self.irc.start(self.bp.event)) add_future(self.start_servers(self.bp.event)) + add_future(self.mempool.main_loop(self.bp.event)) add_future(self.enqueue_delayed_sessions()) + add_future(self.notify()) for n in range(4): add_future(self.serve_requests()) @@ -356,12 +183,12 @@ class ServerManager(util.LoggedClass): except asyncio.CancelledError: break await self.shutdown() + await util.asyncio_clean_shutdown() async def start_server(self, kind, *args, **kw_args): - loop = asyncio.get_event_loop() protocol_class = LocalRPC if kind == 'RPC' else ElectrumX protocol = partial(protocol_class, self, self.bp, self.env, kind) - server = loop.create_server(protocol, *args, **kw_args) + server = self.loop.create_server(protocol, *args, **kw_args) host, port = args[:2] try: @@ -395,27 +222,34 @@ class ServerManager(util.LoggedClass): sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc) - def notify(self, touched): + async def notify(self): '''Notify sessions about height changes and touched addresses.''' - # Invalidate caches - hc = self.history_cache - for hash168 in set(hc).intersection(touched): - del hc[hash168] - if self.bp.db_height != self.height: - self.height = self.bp.db_height - self.header_cache.clear() + while True: + await self.touched_event.wait() + touched = self.touched.copy() + self.touched.clear() + self.touched_event.clear() - for session in self.sessions: - if isinstance(session, ElectrumX): - request = self.NotificationRequest(self.bp.db_height, touched) - session.enqueue_request(request) - # Periodically log sessions - if self.env.log_sessions and time.time() > self.next_log_sessions: - data = self.session_data(for_log=True) - for line in ServerManager.sessions_text_lines(data): - self.logger.info(line) - self.logger.info(json.dumps(self.server_summary())) - self.next_log_sessions = time.time() + self.env.log_sessions + # Invalidate caches + hc = self.history_cache + for hash168 in set(hc).intersection(touched): + del hc[hash168] + if self.bp.db_height != self.height: + self.height = self.bp.db_height + self.header_cache.clear() + + for session in self.sessions: + if isinstance(session, ElectrumX): + request = self.NotificationRequest(self.bp.db_height, + touched) + session.enqueue_request(request) + # Periodically log sessions + if self.env.log_sessions and time.time() > self.next_log_sessions: + data = self.session_data(for_log=True) + for line in ServerManager.sessions_text_lines(data): + self.logger.info(line) + self.logger.info(json.dumps(self.server_summary())) + self.next_log_sessions = time.time() + self.env.log_sessions def electrum_header(self, height): '''Return the binary header at the given height.''' @@ -457,8 +291,6 @@ class ServerManager(util.LoggedClass): server.close() await server.wait_closed() self.servers = [] # So add_session closes new sessions - while not all(future.done() for future in self.futures): - await asyncio.sleep(0) if self.sessions: await self.close_sessions() @@ -474,7 +306,6 @@ class ServerManager(util.LoggedClass): await asyncio.sleep(2) self.logger.info('{:,d} sessions remaining' .format(len(self.sessions))) - await asyncio.sleep(1) def add_session(self, session): # Some connections are acknowledged after the servers are closed @@ -491,9 +322,12 @@ class ServerManager(util.LoggedClass): .format(session.peername(), len(self.sessions))) def remove_session(self, session): - group = self.sessions.pop(session) - group.remove(session) - self.subscription_count -= session.sub_count() + # This test should always be True. However if a bug messes + # things up it prevents consequent log noise + if session in self.sessions: + group = self.sessions.pop(session) + group.remove(session) + self.subscription_count -= session.sub_count() def close_session(self, session): '''Close the session's transport and cancel its future.''' @@ -762,6 +596,7 @@ class Session(JSONRPC): self.log_error('error handling request {}'.format(request)) traceback.print_exc() errs.append(request) + await asyncio.sleep(0) if total >= 8: break @@ -905,7 +740,7 @@ class ElectrumX(Session): # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 history = await self.manager.async_get_history(hash168) - mempool = self.manager.mempool_transactions(hash168) + mempool = await self.manager.mempool_transactions(hash168) status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) for tx_hash, height in history) @@ -940,10 +775,10 @@ class ElectrumX(Session): return {"block_height": height, "merkle": merkle_branch, "pos": pos} - def unconfirmed_history(self, hash168): + async def unconfirmed_history(self, hash168): # Note unconfirmed history is unordered in electrum-server # Height is -1 if unconfirmed txins, otherwise 0 - mempool = self.manager.mempool_transactions(hash168) + mempool = await self.manager.mempool_transactions(hash168) return [{'tx_hash': tx_hash, 'height': -unconfirmed, 'fee': fee} for tx_hash, fee, unconfirmed in mempool] @@ -953,7 +788,7 @@ class ElectrumX(Session): conf = [{'tx_hash': hash_to_str(tx_hash), 'height': height} for tx_hash, height in history] - return conf + self.unconfirmed_history(hash168) + return conf + await self.unconfirmed_history(hash168) def get_chunk(self, index): '''Return header chunk as hex. Index is a non-negative integer.''' @@ -995,7 +830,7 @@ class ElectrumX(Session): async def address_get_mempool(self, params): hash168 = self.params_to_hash168(params) - return self.unconfirmed_history(hash168) + return await self.unconfirmed_history(hash168) async def address_get_proof(self, params): hash168 = self.params_to_hash168(params)