Move history cache to session_mgr

This commit is contained in:
Neil Booth 2018-08-06 14:08:53 +09:00
parent 790d462b55
commit 4bb78ecbbb
3 changed files with 24 additions and 18 deletions

View File

@ -7,7 +7,6 @@
import asyncio import asyncio
import pylru
from aiorpcx import run_in_thread from aiorpcx import run_in_thread
@ -19,24 +18,15 @@ class ChainState(object):
blocks, transaction history, UTXOs and the mempool. blocks, transaction history, UTXOs and the mempool.
''' '''
def __init__(self, env, daemon, bp, notifications): def __init__(self, env, daemon, bp):
self._env = env self._env = env
self._daemon = daemon self._daemon = daemon
self._bp = bp self._bp = bp
self._history_cache = pylru.lrucache(256)
# External interface pass-throughs for session.py # External interface pass-throughs for session.py
self.force_chain_reorg = self._bp.force_chain_reorg self.force_chain_reorg = self._bp.force_chain_reorg
self.tx_branch_and_root = self._bp.merkle.branch_and_root self.tx_branch_and_root = self._bp.merkle.branch_and_root
self.read_headers = self._bp.read_headers self.read_headers = self._bp.read_headers
# Cache maintenance
notifications.add_callback(self._notify)
async def _notify(self, height, touched):
# Invalidate our history cache for touched hashXs
hc = self._history_cache
for hashX in set(hc).intersection(touched):
del hc[hashX]
async def broadcast_transaction(self, raw_tx): async def broadcast_transaction(self, raw_tx):
return await self._daemon.sendrawtransaction([raw_tx]) return await self._daemon.sendrawtransaction([raw_tx])
@ -65,10 +55,7 @@ class ChainState(object):
limit = self._env.max_send // 97 limit = self._env.max_send // 97
return list(self._bp.get_history(hashX, limit=limit)) return list(self._bp.get_history(hashX, limit=limit))
hc = self._history_cache return await run_in_thread(job)
if hashX not in hc:
hc[hashX] = await run_in_thread(job)
return hc[hashX]
async def get_utxos(self, hashX): async def get_utxos(self, hashX):
'''Get UTXOs asynchronously to reduce latency.''' '''Get UTXOs asynchronously to reduce latency.'''

View File

@ -96,7 +96,7 @@ class Controller(ServerBase):
BlockProcessor = env.coin.BLOCK_PROCESSOR BlockProcessor = env.coin.BLOCK_PROCESSOR
bp = BlockProcessor(env, daemon, notifications) bp = BlockProcessor(env, daemon, notifications)
mempool = MemPool(env.coin, daemon, notifications, bp.lookup_utxos) mempool = MemPool(env.coin, daemon, notifications, bp.lookup_utxos)
chain_state = ChainState(env, daemon, bp, notifications) chain_state = ChainState(env, daemon, bp)
session_mgr = SessionManager(env, chain_state, mempool, session_mgr = SessionManager(env, chain_state, mempool,
notifications, shutdown_event) notifications, shutdown_event)

View File

@ -13,6 +13,7 @@ import datetime
import itertools import itertools
import json import json
import os import os
import pylru
import ssl import ssl
import time import time
from collections import defaultdict from collections import defaultdict
@ -119,6 +120,8 @@ class SessionManager(object):
self.state = self.CATCHING_UP self.state = self.CATCHING_UP
self.txs_sent = 0 self.txs_sent = 0
self.start_time = time.time() self.start_time = time.time()
self._history_cache = pylru.lrucache(256)
self._hc_height = 0
# Cache some idea of room to avoid recounting on each subscription # Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0 self.subs_room = 0
# Masternode stuff only for such coins # Masternode stuff only for such coins
@ -442,8 +445,24 @@ class SessionManager(object):
'''The number of connections that we've sent something to.''' '''The number of connections that we've sent something to.'''
return len(self.sessions) return len(self.sessions)
async def get_history(self, hashX):
'''A caching layer.'''
hc = self._history_cache
if hashX not in hc:
hc[hashX] = await self.chain_state.get_history(hashX)
return hc[hashX]
async def _notify_sessions(self, height, touched): async def _notify_sessions(self, height, touched):
'''Notify sessions about height changes and touched addresses.''' '''Notify sessions about height changes and touched addresses.'''
# Invalidate our history cache for touched hashXs
if height != self._hc_height:
self._hc_height = height
hc = self._history_cache
hashXs = set(hc).intersection(touched)
text = [hash_to_hex_str(hashX) for hashX in hashXs]
for hashX in hashXs:
del hc[hashX]
async with TaskGroup() as group: async with TaskGroup() as group:
for session in self.sessions: for session in self.sessions:
await group.spawn(session.notify(height, touched)) await group.spawn(session.notify(height, touched))
@ -755,7 +774,7 @@ class ElectrumX(SessionBase):
''' '''
# Note history is ordered and mempool unordered in electrum-server # Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0 # For mempool, height is -1 if unconfirmed txins, otherwise 0
history = await self.chain_state.get_history(hashX) history = await self.session_mgr.get_history(hashX)
mempool = await self.mempool.transaction_summaries(hashX) mempool = await self.mempool.transaction_summaries(hashX)
status = ''.join('{}:{:d}:'.format(hash_to_hex_str(tx_hash), height) status = ''.join('{}:{:d}:'.format(hash_to_hex_str(tx_hash), height)
@ -855,7 +874,7 @@ class ElectrumX(SessionBase):
async def confirmed_and_unconfirmed_history(self, hashX): async def confirmed_and_unconfirmed_history(self, hashX):
# Note history is ordered but unconfirmed is unordered in e-s # Note history is ordered but unconfirmed is unordered in e-s
history = await self.chain_state.get_history(hashX) history = await self.session_mgr.get_history(hashX)
conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height} conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height}
for tx_hash, height in history] for tx_hash, height in history]
return conf + await self.unconfirmed_history(hashX) return conf + await self.unconfirmed_history(hashX)