diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index 8681c1b..38b4e7b 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -7,7 +7,6 @@ import asyncio -import pylru from aiorpcx import run_in_thread @@ -19,24 +18,15 @@ class ChainState(object): blocks, transaction history, UTXOs and the mempool. ''' - def __init__(self, env, daemon, bp, notifications): + def __init__(self, env, daemon, bp): self._env = env self._daemon = daemon self._bp = bp - self._history_cache = pylru.lrucache(256) # External interface pass-throughs for session.py self.force_chain_reorg = self._bp.force_chain_reorg self.tx_branch_and_root = self._bp.merkle.branch_and_root self.read_headers = self._bp.read_headers - # Cache maintenance - notifications.add_callback(self._notify) - - async def _notify(self, height, touched): - # Invalidate our history cache for touched hashXs - hc = self._history_cache - for hashX in set(hc).intersection(touched): - del hc[hashX] async def broadcast_transaction(self, raw_tx): return await self._daemon.sendrawtransaction([raw_tx]) @@ -65,10 +55,7 @@ class ChainState(object): limit = self._env.max_send // 97 return list(self._bp.get_history(hashX, limit=limit)) - hc = self._history_cache - if hashX not in hc: - hc[hashX] = await run_in_thread(job) - return hc[hashX] + return await run_in_thread(job) async def get_utxos(self, hashX): '''Get UTXOs asynchronously to reduce latency.''' diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 051bed2..da07089 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -96,7 +96,7 @@ class Controller(ServerBase): BlockProcessor = env.coin.BLOCK_PROCESSOR bp = BlockProcessor(env, daemon, notifications) mempool = MemPool(env.coin, daemon, notifications, bp.lookup_utxos) - chain_state = ChainState(env, daemon, bp, notifications) + chain_state = ChainState(env, daemon, bp) session_mgr = SessionManager(env, chain_state, mempool, notifications, shutdown_event) diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 1dc778e..daf97cd 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -13,6 +13,7 @@ import datetime import itertools import json import os +import pylru import ssl import time from collections import defaultdict @@ -119,6 +120,8 @@ class SessionManager(object): self.state = self.CATCHING_UP self.txs_sent = 0 self.start_time = time.time() + self._history_cache = pylru.lrucache(256) + self._hc_height = 0 # Cache some idea of room to avoid recounting on each subscription self.subs_room = 0 # Masternode stuff only for such coins @@ -442,8 +445,24 @@ class SessionManager(object): '''The number of connections that we've sent something to.''' return len(self.sessions) + async def get_history(self, hashX): + '''A caching layer.''' + hc = self._history_cache + if hashX not in hc: + hc[hashX] = await self.chain_state.get_history(hashX) + return hc[hashX] + async def _notify_sessions(self, height, touched): '''Notify sessions about height changes and touched addresses.''' + # Invalidate our history cache for touched hashXs + if height != self._hc_height: + self._hc_height = height + hc = self._history_cache + hashXs = set(hc).intersection(touched) + text = [hash_to_hex_str(hashX) for hashX in hashXs] + for hashX in hashXs: + del hc[hashX] + async with TaskGroup() as group: for session in self.sessions: await group.spawn(session.notify(height, touched)) @@ -755,7 +774,7 @@ class ElectrumX(SessionBase): ''' # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 - history = await self.chain_state.get_history(hashX) + history = await self.session_mgr.get_history(hashX) mempool = await self.mempool.transaction_summaries(hashX) status = ''.join('{}:{:d}:'.format(hash_to_hex_str(tx_hash), height) @@ -855,7 +874,7 @@ class ElectrumX(SessionBase): async def confirmed_and_unconfirmed_history(self, hashX): # Note history is ordered but unconfirmed is unordered in e-s - history = await self.chain_state.get_history(hashX) + history = await self.session_mgr.get_history(hashX) conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height} for tx_hash, height in history] return conf + await self.unconfirmed_history(hashX)