Split mempool out into new file.
Rework mempool handling
This commit is contained in:
parent
ed3db731c2
commit
1ef6a4d785
10
lib/util.py
10
lib/util.py
@ -9,6 +9,7 @@
|
||||
|
||||
|
||||
import array
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
import sys
|
||||
@ -132,3 +133,12 @@ def increment_byte_string(bs):
|
||||
# This can only happen if all characters are 0xff
|
||||
bs = bytes([1]) + bs
|
||||
return bytes(bs)
|
||||
|
||||
async def asyncio_clean_shutdown(loop=None):
|
||||
while True:
|
||||
pending_tasks = [task for task in asyncio.Task.all_tasks(loop)
|
||||
if not task.done()]
|
||||
if len(pending_tasks) <= 1:
|
||||
break
|
||||
await asyncio.sleep(0)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
@ -132,11 +132,9 @@ class BlockProcessor(server.db.DB):
|
||||
Coordinate backing up in case of chain reorganisations.
|
||||
'''
|
||||
|
||||
def __init__(self, client, env):
|
||||
def __init__(self, env, touched, touched_event):
|
||||
super().__init__(env)
|
||||
|
||||
self.client = client
|
||||
|
||||
# The block processor reads its tasks from this queue
|
||||
self.tasks = asyncio.Queue()
|
||||
|
||||
@ -151,6 +149,8 @@ class BlockProcessor(server.db.DB):
|
||||
self.caught_up = False
|
||||
self._shutdown = False
|
||||
self.event = asyncio.Event()
|
||||
self.touched = touched
|
||||
self.touched_event = touched_event
|
||||
|
||||
# Meta
|
||||
self.utxo_MB = env.utxo_MB
|
||||
@ -218,9 +218,8 @@ class BlockProcessor(server.db.DB):
|
||||
for block in blocks:
|
||||
if self._shutdown:
|
||||
break
|
||||
self.advance_block(block, touched)
|
||||
self.advance_block(block, self.touched)
|
||||
|
||||
touched = set()
|
||||
loop = asyncio.get_event_loop()
|
||||
try:
|
||||
if self.caught_up:
|
||||
@ -228,14 +227,14 @@ class BlockProcessor(server.db.DB):
|
||||
else:
|
||||
do_it()
|
||||
except ChainReorg:
|
||||
await self.handle_chain_reorg(touched)
|
||||
await self.handle_chain_reorg(self.touched)
|
||||
|
||||
if self.caught_up:
|
||||
# Flush everything as queries are performed on the DB and
|
||||
# not in-memory.
|
||||
await asyncio.sleep(0)
|
||||
self.flush(True)
|
||||
self.client.notify(touched)
|
||||
self.touched_event.set()
|
||||
elif time.time() > self.next_cache_check:
|
||||
self.check_cache_size()
|
||||
self.next_cache_check = time.time() + 60
|
||||
|
||||
287
server/mempool.py
Normal file
287
server/mempool.py
Normal file
@ -0,0 +1,287 @@
|
||||
# Copyright (c) 2016, Neil Booth
|
||||
#
|
||||
# All rights reserved.
|
||||
#
|
||||
# See the file "LICENCE" for information about the copyright
|
||||
# and warranty status of this software.
|
||||
|
||||
'''Mempool handling.'''
|
||||
|
||||
import asyncio
|
||||
import itertools
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from functools import partial
|
||||
|
||||
from lib.hash import hash_to_str, hex_str_to_hash
|
||||
from lib.tx import Deserializer
|
||||
import lib.util as util
|
||||
from server.daemon import DaemonError
|
||||
|
||||
|
||||
class MemPool(util.LoggedClass):
|
||||
'''Representation of the daemon's mempool.
|
||||
|
||||
Updated regularly in caught-up state. Goal is to enable efficient
|
||||
response to the value() and transactions() calls.
|
||||
|
||||
To that end we maintain the following maps:
|
||||
|
||||
tx_hash -> (txin_pairs, txout_pairs)
|
||||
hash168 -> set of all tx hashes in which the hash168 appears
|
||||
|
||||
A pair is a (hash168, value) tuple. tx hashes are hex strings.
|
||||
'''
|
||||
|
||||
def __init__(self, daemon, coin, db, touched, touched_event):
|
||||
super().__init__()
|
||||
self.daemon = daemon
|
||||
self.coin = coin
|
||||
self.db = db
|
||||
self.touched = touched
|
||||
self.touched_event = touched_event
|
||||
self.stop = False
|
||||
self.txs = {}
|
||||
self.hash168s = defaultdict(set) # None can be a key
|
||||
|
||||
async def main_loop(self, caught_up):
|
||||
'''Asynchronously maintain mempool status with daemon.
|
||||
|
||||
Waits until the caught up event is signalled.'''
|
||||
await caught_up.wait()
|
||||
self.logger.info('beginning processing of daemon mempool. '
|
||||
'This can take some time...')
|
||||
try:
|
||||
await self.fetch_and_process()
|
||||
except asyncio.CancelledError:
|
||||
# This aids clean shutdowns
|
||||
self.stop = True
|
||||
|
||||
async def fetch_and_process(self):
|
||||
'''The inner loop unprotected by try / except.'''
|
||||
unfetched = set()
|
||||
unprocessed = {}
|
||||
log_every = 150
|
||||
log_secs = 0
|
||||
fetch_size = 400
|
||||
process_some = self.async_process_some(unfetched, fetch_size // 2)
|
||||
next_refresh = 0
|
||||
# The list of mempool hashes is fetched no more frequently
|
||||
# than this number of seconds
|
||||
refresh_secs = 5
|
||||
|
||||
while True:
|
||||
try:
|
||||
now = time.time()
|
||||
if now >= next_refresh:
|
||||
await self.new_hashes(unprocessed, unfetched)
|
||||
next_refresh = now + refresh_secs
|
||||
log_secs -= refresh_secs
|
||||
|
||||
# Fetch some txs if unfetched ones remain
|
||||
if unfetched:
|
||||
count = min(len(unfetched), fetch_size)
|
||||
hex_hashes = [unfetched.pop() for n in range(count)]
|
||||
unprocessed.update(await self.fetch_raw_txs(hex_hashes))
|
||||
|
||||
# Process some txs if unprocessed ones remain
|
||||
if unprocessed:
|
||||
await process_some(unprocessed)
|
||||
|
||||
if self.touched:
|
||||
self.touched_event.set()
|
||||
|
||||
if log_secs <= 0 and not unprocessed:
|
||||
log_secs = log_every
|
||||
self.logger.info('{:,d} txs touching {:,d} addresses'
|
||||
.format(len(self.txs),
|
||||
len(self.hash168s)))
|
||||
await asyncio.sleep(1)
|
||||
except DaemonError as e:
|
||||
self.logger.info('ignoring daemon error: {}'.format(e))
|
||||
|
||||
async def new_hashes(self, unprocessed, unfetched):
|
||||
'''Get the current list of hashes in the daemon's mempool.
|
||||
|
||||
Remove ones that have disappeared from self.txs and unprocessed.
|
||||
'''
|
||||
txs = self.txs
|
||||
hash168s = self.hash168s
|
||||
touched = self.touched
|
||||
|
||||
hashes = set(await self.daemon.mempool_hashes())
|
||||
new = hashes.difference(txs)
|
||||
gone = set(txs).difference(hashes)
|
||||
for hex_hash in gone:
|
||||
unprocessed.pop(hex_hash, None)
|
||||
item = txs.pop(hex_hash)
|
||||
if item:
|
||||
txin_pairs, txout_pairs = item
|
||||
tx_hash168s = set(hash168 for hash168, value in txin_pairs)
|
||||
tx_hash168s.update(hash168 for hash168, value in txout_pairs)
|
||||
for hash168 in tx_hash168s:
|
||||
hash168s[hash168].remove(hex_hash)
|
||||
if not hash168s[hash168]:
|
||||
del hash168s[hash168]
|
||||
touched.update(tx_hash168s)
|
||||
|
||||
unfetched.update(new)
|
||||
for hex_hash in new:
|
||||
txs[hex_hash] = None
|
||||
|
||||
def async_process_some(self, unfetched, limit):
|
||||
loop = asyncio.get_event_loop()
|
||||
pending = []
|
||||
txs = self.txs
|
||||
|
||||
async def process(unprocessed):
|
||||
nonlocal pending
|
||||
|
||||
raw_txs = {}
|
||||
while unprocessed and len(raw_txs) < limit:
|
||||
hex_hash, raw_tx = unprocessed.popitem()
|
||||
raw_txs[hex_hash] = raw_tx
|
||||
|
||||
if unprocessed:
|
||||
deferred = []
|
||||
else:
|
||||
deferred = pending
|
||||
pending = []
|
||||
|
||||
process_raw_txs = partial(self.process_raw_txs, raw_txs, deferred)
|
||||
result, deferred = (
|
||||
await loop.run_in_executor(None, process_raw_txs))
|
||||
|
||||
pending.extend(deferred)
|
||||
hash168s = self.hash168s
|
||||
touched = self.touched
|
||||
for hex_hash, in_out_pairs in result.items():
|
||||
if hex_hash in txs:
|
||||
txs[hex_hash] = in_out_pairs
|
||||
for hash168, value in itertools.chain(*in_out_pairs):
|
||||
touched.add(hash168)
|
||||
hash168s[hash168].add(hex_hash)
|
||||
|
||||
to_do = len(unfetched) + len(unprocessed)
|
||||
if to_do:
|
||||
percent = (len(txs) - to_do) * 100 // len(txs)
|
||||
self.logger.info('catchup {:d}% complete'.format(percent))
|
||||
|
||||
return process
|
||||
|
||||
async def fetch_raw_txs(self, hex_hashes):
|
||||
'''Fetch a list of mempool transactions.'''
|
||||
raw_txs = await self.daemon.getrawtransactions(hex_hashes)
|
||||
|
||||
# Skip hashes the daemon has dropped. Either they were
|
||||
# evicted or they got in a block.
|
||||
return {hh:raw for hh, raw in zip(hex_hashes, raw_txs) if raw}
|
||||
|
||||
def process_raw_txs(self, raw_tx_map, pending):
|
||||
'''Process the dictionary of raw transactions and return a dictionary
|
||||
of updates to apply to self.txs.
|
||||
|
||||
This runs in the executor so should not update any member
|
||||
variables it doesn't own. Atomic reads of self.txs that do
|
||||
not depend on the result remaining the same are fine.
|
||||
'''
|
||||
script_hash168 = self.coin.hash168_from_script()
|
||||
db_utxo_lookup = self.db.db_utxo_lookup
|
||||
txs = self.txs
|
||||
|
||||
# Deserialize each tx and put it in our priority queue
|
||||
for tx_hash, raw_tx in raw_tx_map.items():
|
||||
if not tx_hash in txs:
|
||||
continue
|
||||
tx = Deserializer(raw_tx).read_tx()
|
||||
|
||||
# Convert the tx outputs into (hash168, value) pairs
|
||||
txout_pairs = [(script_hash168(txout.pk_script), txout.value)
|
||||
for txout in tx.outputs]
|
||||
|
||||
# Convert the tx inputs to ([prev_hex_hash, prev_idx) pairs
|
||||
txin_pairs = [(hash_to_str(txin.prev_hash), txin.prev_idx)
|
||||
for txin in tx.inputs]
|
||||
|
||||
pending.append((tx_hash, txin_pairs, txout_pairs))
|
||||
|
||||
# Now process what we can
|
||||
result = {}
|
||||
deferred = []
|
||||
|
||||
for item in pending:
|
||||
if self.stop:
|
||||
break
|
||||
|
||||
tx_hash, old_txin_pairs, txout_pairs = item
|
||||
if tx_hash not in txs:
|
||||
continue
|
||||
|
||||
mempool_missing = False
|
||||
txin_pairs = []
|
||||
|
||||
try:
|
||||
for prev_hex_hash, prev_idx in old_txin_pairs:
|
||||
tx_info = txs.get(prev_hex_hash, 0)
|
||||
if tx_info is None:
|
||||
tx_info = result.get(prev_hex_hash)
|
||||
if not tx_info:
|
||||
mempool_missing = True
|
||||
continue
|
||||
if tx_info:
|
||||
txin_pairs.append(tx_info[1][prev_idx])
|
||||
elif not mempool_missing:
|
||||
prev_hash = hex_str_to_hash(prev_hex_hash)
|
||||
txin_pairs.append(db_utxo_lookup(prev_hash, prev_idx))
|
||||
except self.db.MissingUTXOError:
|
||||
# This typically happens just after the daemon has
|
||||
# accepted a new block and the new mempool has deps on
|
||||
# new txs in that block.
|
||||
continue
|
||||
|
||||
if mempool_missing:
|
||||
deferred.append(item)
|
||||
else:
|
||||
result[tx_hash] = (txin_pairs, txout_pairs)
|
||||
|
||||
return result, deferred
|
||||
|
||||
async def transactions(self, hash168):
|
||||
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
|
||||
entries for the hash168.
|
||||
|
||||
unconfirmed is True if any txin is unconfirmed.
|
||||
'''
|
||||
# hash168s is a defaultdict
|
||||
if not hash168 in self.hash168s:
|
||||
return []
|
||||
|
||||
hex_hashes = self.hash168s[hash168]
|
||||
raw_txs = self.bp.daemon.getrawtransactions(hex_hashes)
|
||||
result = []
|
||||
for hex_hash, raw_tx in zip(hex_hashes, raw_txs):
|
||||
item = self.txs.get(hex_hash)
|
||||
if not item or not raw_tx:
|
||||
continue
|
||||
tx = Deserializer(raw_tx).read_tx()
|
||||
txin_pairs, txout_pairs = item
|
||||
tx_fee = (sum(v for hash168, v in txin_pairs)
|
||||
- sum(v for hash168, v in txout_pairs))
|
||||
unconfirmed = any(txin.prev_hash not in self.txs
|
||||
for txin in tx.inputs)
|
||||
result.append((hex_hash, tx_fee, unconfirmed))
|
||||
return result
|
||||
|
||||
def value(self, hash168):
|
||||
'''Return the unconfirmed amount in the mempool for hash168.
|
||||
|
||||
Can be positive or negative.
|
||||
'''
|
||||
value = 0
|
||||
# hash168s is a defaultdict
|
||||
if hash168 in self.hash168s:
|
||||
for hex_hash in self.hash168s[hash168]:
|
||||
txin_pairs, txout_pairs = self.txs[hex_hash]
|
||||
value -= sum(v for h168, v in txin_pairs if h168 == hash168)
|
||||
value += sum(v for h168, v in txout_pairs if h168 == hash168)
|
||||
return value
|
||||
@ -22,192 +22,14 @@ import pylru
|
||||
|
||||
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
|
||||
from lib.jsonrpc import JSONRPC, RequestBase
|
||||
from lib.tx import Deserializer
|
||||
import lib.util as util
|
||||
from server.block_processor import BlockProcessor
|
||||
from server.daemon import DaemonError
|
||||
from server.irc import IRC
|
||||
from server.mempool import MemPool
|
||||
from server.version import VERSION
|
||||
|
||||
|
||||
class MemPool(util.LoggedClass):
|
||||
'''Representation of the daemon's mempool.
|
||||
|
||||
Updated regularly in caught-up state. Goal is to enable efficient
|
||||
response to the value() and transactions() calls.
|
||||
|
||||
To that end we maintain the following maps:
|
||||
|
||||
tx_hash -> [txin_pairs, txout_pairs, unconfirmed]
|
||||
hash168 -> set of all tx hashes in which the hash168 appears
|
||||
|
||||
A pair is a (hash168, value) tuple. Unconfirmed is true if any of the
|
||||
tx's txins are unconfirmed. tx hashes are hex strings.
|
||||
'''
|
||||
|
||||
def __init__(self, daemon, coin, db, manager):
|
||||
super().__init__()
|
||||
self.daemon = daemon
|
||||
self.coin = coin
|
||||
self.db = db
|
||||
self.manager = manager
|
||||
self.txs = {}
|
||||
self.hash168s = defaultdict(set) # None can be a key
|
||||
self.count = -1
|
||||
|
||||
async def main_loop(self, caught_up):
|
||||
'''Asynchronously maintain mempool status with daemon.
|
||||
|
||||
Waits until the caught up event is signalled.'''
|
||||
await caught_up.wait()
|
||||
self.logger.info('maintaining state with daemon...')
|
||||
while True:
|
||||
try:
|
||||
await self.update()
|
||||
await asyncio.sleep(5)
|
||||
except DaemonError as e:
|
||||
self.logger.info('ignoring daemon error: {}'.format(e))
|
||||
|
||||
async def update(self):
|
||||
'''Update state given the current mempool to the passed set of hashes.
|
||||
|
||||
Remove transactions that are no longer in our mempool.
|
||||
Request new transactions we don't have then add to our mempool.
|
||||
'''
|
||||
hex_hashes = set(await self.daemon.mempool_hashes())
|
||||
touched = set()
|
||||
missing_utxos = []
|
||||
|
||||
initial = self.count < 0
|
||||
if initial:
|
||||
self.logger.info('beginning import of {:,d} mempool txs'
|
||||
.format(len(hex_hashes)))
|
||||
|
||||
# Remove gone items
|
||||
gone = set(self.txs).difference(hex_hashes)
|
||||
for hex_hash in gone:
|
||||
txin_pairs, txout_pairs, unconfirmed = self.txs.pop(hex_hash)
|
||||
hash168s = set(hash168 for hash168, value in txin_pairs)
|
||||
hash168s.update(hash168 for hash168, value in txout_pairs)
|
||||
for hash168 in hash168s:
|
||||
self.hash168s[hash168].remove(hex_hash)
|
||||
if not self.hash168s[hash168]:
|
||||
del self.hash168s[hash168]
|
||||
touched.update(hash168s)
|
||||
|
||||
# Get the raw transactions for the new hashes. Ignore the
|
||||
# ones the daemon no longer has (it will return None). Put
|
||||
# them into a dictionary of hex hash to deserialized tx.
|
||||
hex_hashes.difference_update(self.txs)
|
||||
raw_txs = await self.daemon.getrawtransactions(hex_hashes)
|
||||
if initial:
|
||||
self.logger.info('analysing {:,d} mempool txs'
|
||||
.format(len(raw_txs)))
|
||||
new_txs = {hex_hash: Deserializer(raw_tx).read_tx()
|
||||
for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx}
|
||||
del raw_txs, hex_hashes
|
||||
|
||||
# The mempool is unordered, so process all outputs first so
|
||||
# that looking for inputs has full info.
|
||||
script_hash168 = self.coin.hash168_from_script()
|
||||
db_utxo_lookup = self.db.db_utxo_lookup
|
||||
|
||||
def txout_pair(txout):
|
||||
return (script_hash168(txout.pk_script), txout.value)
|
||||
|
||||
for n, (hex_hash, tx) in enumerate(new_txs.items()):
|
||||
# Yield to process e.g. signals
|
||||
if n % 20 == 0:
|
||||
await asyncio.sleep(0)
|
||||
txout_pairs = [txout_pair(txout) for txout in tx.outputs]
|
||||
self.txs[hex_hash] = (None, txout_pairs, None)
|
||||
|
||||
def txin_info(txin):
|
||||
hex_hash = hash_to_str(txin.prev_hash)
|
||||
mempool_entry = self.txs.get(hex_hash)
|
||||
if mempool_entry:
|
||||
return mempool_entry[1][txin.prev_idx], True
|
||||
pair = db_utxo_lookup(txin.prev_hash, txin.prev_idx)
|
||||
return pair, False
|
||||
|
||||
if initial:
|
||||
next_log = time.time()
|
||||
self.logger.info('processed outputs, now examining inputs. '
|
||||
'This can take some time...')
|
||||
|
||||
# Now add the inputs
|
||||
for n, (hex_hash, tx) in enumerate(new_txs.items()):
|
||||
# Yield to process e.g. signals
|
||||
await asyncio.sleep(0)
|
||||
|
||||
if initial and time.time() > next_log:
|
||||
next_log = time.time() + 20
|
||||
self.logger.info('{:,d} done ({:d}%)'
|
||||
.format(n, int(n / len(new_txs) * 100)))
|
||||
|
||||
txout_pairs = self.txs[hex_hash][1]
|
||||
try:
|
||||
infos = (txin_info(txin) for txin in tx.inputs)
|
||||
txin_pairs, unconfs = zip(*infos)
|
||||
except self.db.MissingUTXOError:
|
||||
# Drop this TX. If other mempool txs depend on it
|
||||
# it's harmless - next time the mempool is refreshed
|
||||
# they'll either be cleaned up or the UTXOs will no
|
||||
# longer be missing.
|
||||
del self.txs[hex_hash]
|
||||
continue
|
||||
self.txs[hex_hash] = (txin_pairs, txout_pairs, any(unconfs))
|
||||
|
||||
# Update touched and self.hash168s for the new tx
|
||||
for hash168, value in txin_pairs:
|
||||
self.hash168s[hash168].add(hex_hash)
|
||||
touched.add(hash168)
|
||||
for hash168, value in txout_pairs:
|
||||
self.hash168s[hash168].add(hex_hash)
|
||||
touched.add(hash168)
|
||||
|
||||
if missing_utxos:
|
||||
self.logger.info('{:,d} txs had missing UTXOs; probably the '
|
||||
'daemon is a block or two ahead of us.'
|
||||
.format(len(missing_utxos)))
|
||||
first = ', '.join('{} / {:,d}'.format(hash_to_str(txin.prev_hash),
|
||||
txin.prev_idx)
|
||||
for txin in sorted(missing_utxos)[:3])
|
||||
self.logger.info('first ones are {}'.format(first))
|
||||
|
||||
self.count += 1
|
||||
if self.count % 25 == 0 or gone:
|
||||
self.count = 0
|
||||
self.logger.info('{:,d} txs touching {:,d} addresses'
|
||||
.format(len(self.txs), len(self.hash168s)))
|
||||
|
||||
self.manager.notify(touched)
|
||||
|
||||
def transactions(self, hash168):
|
||||
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
|
||||
entries for the hash168.
|
||||
|
||||
unconfirmed is True if any txin is unconfirmed.
|
||||
'''
|
||||
for hex_hash in self.hash168s[hash168]:
|
||||
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash]
|
||||
tx_fee = (sum(v for hash168, v in txin_pairs)
|
||||
- sum(v for hash168, v in txout_pairs))
|
||||
yield (hex_hash, tx_fee, unconfirmed)
|
||||
|
||||
def value(self, hash168):
|
||||
'''Return the unconfirmed amount in the mempool for hash168.
|
||||
|
||||
Can be positive or negative.
|
||||
'''
|
||||
value = 0
|
||||
for hex_hash in self.hash168s[hash168]:
|
||||
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash]
|
||||
value -= sum(v for h168, v in txin_pairs if h168 == hash168)
|
||||
value += sum(v for h168, v in txout_pairs if h168 == hash168)
|
||||
return value
|
||||
|
||||
|
||||
class ServerManager(util.LoggedClass):
|
||||
'''Manages the client servers, a mempool, and a block processor.
|
||||
|
||||
@ -229,9 +51,13 @@ class ServerManager(util.LoggedClass):
|
||||
|
||||
def __init__(self, env):
|
||||
super().__init__()
|
||||
self.loop = asyncio.get_event_loop()
|
||||
self.touched = set()
|
||||
self.touched_event = asyncio.Event()
|
||||
self.start = time.time()
|
||||
self.bp = BlockProcessor(self, env)
|
||||
self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, self)
|
||||
self.bp = BlockProcessor(env, self.touched, self.touched_event)
|
||||
self.mempool = MemPool(self.bp.daemon, env.coin, self.bp,
|
||||
self.touched, self.touched_event)
|
||||
self.irc = IRC(env)
|
||||
self.env = env
|
||||
self.servers = []
|
||||
@ -261,13 +87,13 @@ class ServerManager(util.LoggedClass):
|
||||
self.logger.info('max subscriptions per session: {:,d}'
|
||||
.format(env.max_session_subs))
|
||||
|
||||
def mempool_transactions(self, hash168):
|
||||
async def mempool_transactions(self, hash168):
|
||||
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
|
||||
entries for the hash168.
|
||||
|
||||
unconfirmed is True if any txin is unconfirmed.
|
||||
'''
|
||||
return self.mempool.transactions(hash168)
|
||||
return await self.mempool.transactions(hash168)
|
||||
|
||||
def mempool_value(self, hash168):
|
||||
'''Return the unconfirmed amount in the mempool for hash168.
|
||||
@ -343,10 +169,11 @@ class ServerManager(util.LoggedClass):
|
||||
# shutdown() assumes bp.main_loop() is first
|
||||
add_future(self.bp.main_loop())
|
||||
add_future(self.bp.prefetcher.main_loop())
|
||||
add_future(self.mempool.main_loop(self.bp.event))
|
||||
add_future(self.irc.start(self.bp.event))
|
||||
add_future(self.start_servers(self.bp.event))
|
||||
add_future(self.mempool.main_loop(self.bp.event))
|
||||
add_future(self.enqueue_delayed_sessions())
|
||||
add_future(self.notify())
|
||||
for n in range(4):
|
||||
add_future(self.serve_requests())
|
||||
|
||||
@ -356,12 +183,12 @@ class ServerManager(util.LoggedClass):
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
await self.shutdown()
|
||||
await util.asyncio_clean_shutdown()
|
||||
|
||||
async def start_server(self, kind, *args, **kw_args):
|
||||
loop = asyncio.get_event_loop()
|
||||
protocol_class = LocalRPC if kind == 'RPC' else ElectrumX
|
||||
protocol = partial(protocol_class, self, self.bp, self.env, kind)
|
||||
server = loop.create_server(protocol, *args, **kw_args)
|
||||
server = self.loop.create_server(protocol, *args, **kw_args)
|
||||
|
||||
host, port = args[:2]
|
||||
try:
|
||||
@ -395,27 +222,34 @@ class ServerManager(util.LoggedClass):
|
||||
sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile)
|
||||
await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc)
|
||||
|
||||
def notify(self, touched):
|
||||
async def notify(self):
|
||||
'''Notify sessions about height changes and touched addresses.'''
|
||||
# Invalidate caches
|
||||
hc = self.history_cache
|
||||
for hash168 in set(hc).intersection(touched):
|
||||
del hc[hash168]
|
||||
if self.bp.db_height != self.height:
|
||||
self.height = self.bp.db_height
|
||||
self.header_cache.clear()
|
||||
while True:
|
||||
await self.touched_event.wait()
|
||||
touched = self.touched.copy()
|
||||
self.touched.clear()
|
||||
self.touched_event.clear()
|
||||
|
||||
for session in self.sessions:
|
||||
if isinstance(session, ElectrumX):
|
||||
request = self.NotificationRequest(self.bp.db_height, touched)
|
||||
session.enqueue_request(request)
|
||||
# Periodically log sessions
|
||||
if self.env.log_sessions and time.time() > self.next_log_sessions:
|
||||
data = self.session_data(for_log=True)
|
||||
for line in ServerManager.sessions_text_lines(data):
|
||||
self.logger.info(line)
|
||||
self.logger.info(json.dumps(self.server_summary()))
|
||||
self.next_log_sessions = time.time() + self.env.log_sessions
|
||||
# Invalidate caches
|
||||
hc = self.history_cache
|
||||
for hash168 in set(hc).intersection(touched):
|
||||
del hc[hash168]
|
||||
if self.bp.db_height != self.height:
|
||||
self.height = self.bp.db_height
|
||||
self.header_cache.clear()
|
||||
|
||||
for session in self.sessions:
|
||||
if isinstance(session, ElectrumX):
|
||||
request = self.NotificationRequest(self.bp.db_height,
|
||||
touched)
|
||||
session.enqueue_request(request)
|
||||
# Periodically log sessions
|
||||
if self.env.log_sessions and time.time() > self.next_log_sessions:
|
||||
data = self.session_data(for_log=True)
|
||||
for line in ServerManager.sessions_text_lines(data):
|
||||
self.logger.info(line)
|
||||
self.logger.info(json.dumps(self.server_summary()))
|
||||
self.next_log_sessions = time.time() + self.env.log_sessions
|
||||
|
||||
def electrum_header(self, height):
|
||||
'''Return the binary header at the given height.'''
|
||||
@ -457,8 +291,6 @@ class ServerManager(util.LoggedClass):
|
||||
server.close()
|
||||
await server.wait_closed()
|
||||
self.servers = [] # So add_session closes new sessions
|
||||
while not all(future.done() for future in self.futures):
|
||||
await asyncio.sleep(0)
|
||||
if self.sessions:
|
||||
await self.close_sessions()
|
||||
|
||||
@ -474,7 +306,6 @@ class ServerManager(util.LoggedClass):
|
||||
await asyncio.sleep(2)
|
||||
self.logger.info('{:,d} sessions remaining'
|
||||
.format(len(self.sessions)))
|
||||
await asyncio.sleep(1)
|
||||
|
||||
def add_session(self, session):
|
||||
# Some connections are acknowledged after the servers are closed
|
||||
@ -491,9 +322,12 @@ class ServerManager(util.LoggedClass):
|
||||
.format(session.peername(), len(self.sessions)))
|
||||
|
||||
def remove_session(self, session):
|
||||
group = self.sessions.pop(session)
|
||||
group.remove(session)
|
||||
self.subscription_count -= session.sub_count()
|
||||
# This test should always be True. However if a bug messes
|
||||
# things up it prevents consequent log noise
|
||||
if session in self.sessions:
|
||||
group = self.sessions.pop(session)
|
||||
group.remove(session)
|
||||
self.subscription_count -= session.sub_count()
|
||||
|
||||
def close_session(self, session):
|
||||
'''Close the session's transport and cancel its future.'''
|
||||
@ -762,6 +596,7 @@ class Session(JSONRPC):
|
||||
self.log_error('error handling request {}'.format(request))
|
||||
traceback.print_exc()
|
||||
errs.append(request)
|
||||
await asyncio.sleep(0)
|
||||
if total >= 8:
|
||||
break
|
||||
|
||||
@ -905,7 +740,7 @@ class ElectrumX(Session):
|
||||
# Note history is ordered and mempool unordered in electrum-server
|
||||
# For mempool, height is -1 if unconfirmed txins, otherwise 0
|
||||
history = await self.manager.async_get_history(hash168)
|
||||
mempool = self.manager.mempool_transactions(hash168)
|
||||
mempool = await self.manager.mempool_transactions(hash168)
|
||||
|
||||
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
|
||||
for tx_hash, height in history)
|
||||
@ -940,10 +775,10 @@ class ElectrumX(Session):
|
||||
|
||||
return {"block_height": height, "merkle": merkle_branch, "pos": pos}
|
||||
|
||||
def unconfirmed_history(self, hash168):
|
||||
async def unconfirmed_history(self, hash168):
|
||||
# Note unconfirmed history is unordered in electrum-server
|
||||
# Height is -1 if unconfirmed txins, otherwise 0
|
||||
mempool = self.manager.mempool_transactions(hash168)
|
||||
mempool = await self.manager.mempool_transactions(hash168)
|
||||
return [{'tx_hash': tx_hash, 'height': -unconfirmed, 'fee': fee}
|
||||
for tx_hash, fee, unconfirmed in mempool]
|
||||
|
||||
@ -953,7 +788,7 @@ class ElectrumX(Session):
|
||||
conf = [{'tx_hash': hash_to_str(tx_hash), 'height': height}
|
||||
for tx_hash, height in history]
|
||||
|
||||
return conf + self.unconfirmed_history(hash168)
|
||||
return conf + await self.unconfirmed_history(hash168)
|
||||
|
||||
def get_chunk(self, index):
|
||||
'''Return header chunk as hex. Index is a non-negative integer.'''
|
||||
@ -995,7 +830,7 @@ class ElectrumX(Session):
|
||||
|
||||
async def address_get_mempool(self, params):
|
||||
hash168 = self.params_to_hash168(params)
|
||||
return self.unconfirmed_history(hash168)
|
||||
return await self.unconfirmed_history(hash168)
|
||||
|
||||
async def address_get_proof(self, params):
|
||||
hash168 = self.params_to_hash168(params)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user