Rename get_history to limited_history
- make it async and run in a thread
This commit is contained in:
parent
a036a2eb3f
commit
12dbf2c74a
@ -73,8 +73,8 @@ async def query(args):
|
|||||||
if not hashX:
|
if not hashX:
|
||||||
continue
|
continue
|
||||||
n = None
|
n = None
|
||||||
for n, (tx_hash, height) in enumerate(db.get_history(hashX, limit),
|
history = await db.limited_history(hashX, limit=limit)
|
||||||
start=1):
|
for n, (tx_hash, height) in enumerate(history, start=1):
|
||||||
print(f'History #{n:,d}: height {height:,d} '
|
print(f'History #{n:,d}: height {height:,d} '
|
||||||
f'tx_hash {hash_to_hex_str(tx_hash)}')
|
f'tx_hash {hash_to_hex_str(tx_hash)}')
|
||||||
if n is None:
|
if n is None:
|
||||||
|
|||||||
@ -8,8 +8,6 @@
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
from aiorpcx import run_in_thread
|
|
||||||
|
|
||||||
from electrumx.lib.hash import hash_to_hex_str
|
from electrumx.lib.hash import hash_to_hex_str
|
||||||
|
|
||||||
|
|
||||||
@ -28,6 +26,7 @@ class ChainState(object):
|
|||||||
self.tx_branch_and_root = self._bp.merkle.branch_and_root
|
self.tx_branch_and_root = self._bp.merkle.branch_and_root
|
||||||
self.read_headers = self._bp.read_headers
|
self.read_headers = self._bp.read_headers
|
||||||
self.all_utxos = self._bp.all_utxos
|
self.all_utxos = self._bp.all_utxos
|
||||||
|
self.limited_history = self._bp.limited_history
|
||||||
|
|
||||||
async def broadcast_transaction(self, raw_tx):
|
async def broadcast_transaction(self, raw_tx):
|
||||||
return await self._daemon.sendrawtransaction([raw_tx])
|
return await self._daemon.sendrawtransaction([raw_tx])
|
||||||
@ -46,18 +45,6 @@ class ChainState(object):
|
|||||||
'db_height': self.db_height(),
|
'db_height': self.db_height(),
|
||||||
}
|
}
|
||||||
|
|
||||||
async def get_history(self, hashX):
|
|
||||||
'''Get history asynchronously to reduce latency.'''
|
|
||||||
def job():
|
|
||||||
# History DoS limit. Each element of history is about 99
|
|
||||||
# bytes when encoded as JSON. This limits resource usage
|
|
||||||
# on bloated history requests, and uses a smaller divisor
|
|
||||||
# so large requests are logged before refusing them.
|
|
||||||
limit = self._env.max_send // 97
|
|
||||||
return list(self._bp.get_history(hashX, limit=limit))
|
|
||||||
|
|
||||||
return await run_in_thread(job)
|
|
||||||
|
|
||||||
def header_branch_and_root(self, length, height):
|
def header_branch_and_root(self, length, height):
|
||||||
return self._bp.header_mc.branch_and_root(length, height)
|
return self._bp.header_mc.branch_and_root(length, height)
|
||||||
|
|
||||||
@ -102,8 +89,8 @@ class ChainState(object):
|
|||||||
if not hashX:
|
if not hashX:
|
||||||
continue
|
continue
|
||||||
n = None
|
n = None
|
||||||
for n, (tx_hash, height) in enumerate(
|
history = await db.limited_history(hashX, limit=limit)
|
||||||
db.get_history(hashX, limit), start=1):
|
for n, (tx_hash, height) in enumerate(history):
|
||||||
lines.append(f'History #{n:,d}: height {height:,d} '
|
lines.append(f'History #{n:,d}: height {height:,d} '
|
||||||
f'tx_hash {hash_to_hex_str(tx_hash)}')
|
f'tx_hash {hash_to_hex_str(tx_hash)}')
|
||||||
if n is None:
|
if n is None:
|
||||||
|
|||||||
@ -227,15 +227,19 @@ class DB(object):
|
|||||||
|
|
||||||
return [self.coin.header_hash(header) for header in headers]
|
return [self.coin.header_hash(header) for header in headers]
|
||||||
|
|
||||||
def get_history(self, hashX, limit=1000):
|
async def limited_history(self, hashX, *, limit=1000):
|
||||||
'''Generator that returns an unpruned, sorted list of (tx_hash,
|
'''Return an unpruned, sorted list of (tx_hash, height) tuples of
|
||||||
height) tuples of confirmed transactions that touched the address,
|
confirmed transactions that touched the address, earliest in
|
||||||
earliest in the blockchain first. Includes both spending and
|
the blockchain first. Includes both spending and receiving
|
||||||
receiving transactions. By default yields at most 1000 entries.
|
transactions. By default returns at most 1000 entries. Set
|
||||||
Set limit to None to get them all.
|
limit to None to get them all.
|
||||||
'''
|
'''
|
||||||
for tx_num in self.history.get_txnums(hashX, limit):
|
def read_history():
|
||||||
yield self.fs_tx_hash(tx_num)
|
tx_nums = list(self.history.get_txnums(hashX, limit))
|
||||||
|
fs_tx_hash = self.fs_tx_hash
|
||||||
|
return [fs_tx_hash(tx_num) for tx_num in tx_nums]
|
||||||
|
|
||||||
|
return await run_in_thread(read_history)
|
||||||
|
|
||||||
# -- Undo information
|
# -- Undo information
|
||||||
|
|
||||||
|
|||||||
@ -444,11 +444,17 @@ class SessionManager(object):
|
|||||||
'''The number of connections that we've sent something to.'''
|
'''The number of connections that we've sent something to.'''
|
||||||
return len(self.sessions)
|
return len(self.sessions)
|
||||||
|
|
||||||
async def get_history(self, hashX):
|
async def limited_history(self, hashX):
|
||||||
'''A caching layer.'''
|
'''A caching layer.'''
|
||||||
hc = self._history_cache
|
hc = self._history_cache
|
||||||
if hashX not in hc:
|
if hashX not in hc:
|
||||||
hc[hashX] = await self.chain_state.get_history(hashX)
|
# History DoS limit. Each element of history is about 99
|
||||||
|
# bytes when encoded as JSON. This limits resource usage
|
||||||
|
# on bloated history requests, and uses a smaller divisor
|
||||||
|
# so large requests are logged before refusing them.
|
||||||
|
limit = self.env.max_send // 97
|
||||||
|
hc[hashX] = await self.chain_state.limited_history(hashX,
|
||||||
|
limit=limit)
|
||||||
return hc[hashX]
|
return hc[hashX]
|
||||||
|
|
||||||
async def _notify_sessions(self, height, touched):
|
async def _notify_sessions(self, height, touched):
|
||||||
@ -773,7 +779,7 @@ class ElectrumX(SessionBase):
|
|||||||
'''
|
'''
|
||||||
# Note history is ordered and mempool unordered in electrum-server
|
# Note history is ordered and mempool unordered in electrum-server
|
||||||
# For mempool, height is -1 if unconfirmed txins, otherwise 0
|
# For mempool, height is -1 if unconfirmed txins, otherwise 0
|
||||||
history = await self.session_mgr.get_history(hashX)
|
history = await self.session_mgr.limited_history(hashX)
|
||||||
mempool = await self.mempool.transaction_summaries(hashX)
|
mempool = await self.mempool.transaction_summaries(hashX)
|
||||||
|
|
||||||
status = ''.join('{}:{:d}:'.format(hash_to_hex_str(tx_hash), height)
|
status = ''.join('{}:{:d}:'.format(hash_to_hex_str(tx_hash), height)
|
||||||
@ -873,7 +879,7 @@ class ElectrumX(SessionBase):
|
|||||||
|
|
||||||
async def confirmed_and_unconfirmed_history(self, hashX):
|
async def confirmed_and_unconfirmed_history(self, hashX):
|
||||||
# Note history is ordered but unconfirmed is unordered in e-s
|
# Note history is ordered but unconfirmed is unordered in e-s
|
||||||
history = await self.session_mgr.get_history(hashX)
|
history = await self.session_mgr.limited_history(hashX)
|
||||||
conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height}
|
conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height}
|
||||||
for tx_hash, height in history]
|
for tx_hash, height in history]
|
||||||
return conf + await self.unconfirmed_history(hashX)
|
return conf + await self.unconfirmed_history(hashX)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user