Merge branch 'no_mempool_wait' into develop
This commit is contained in:
commit
892a3de85b
@ -21,7 +21,6 @@ from functools import partial
|
||||
from server.daemon import Daemon, DaemonError
|
||||
from server.version import VERSION
|
||||
from lib.hash import hash_to_str
|
||||
from lib.tx import Deserializer
|
||||
from lib.util import chunks, LoggedClass
|
||||
import server.db
|
||||
from server.storage import open_db
|
||||
@ -152,170 +151,6 @@ class ChainReorg(Exception):
|
||||
'''Raised on a blockchain reorganisation.'''
|
||||
|
||||
|
||||
class MemPool(LoggedClass):
|
||||
'''Representation of the daemon's mempool.
|
||||
|
||||
Updated regularly in caught-up state. Goal is to enable efficient
|
||||
response to the value() and transactions() calls.
|
||||
|
||||
To that end we maintain the following maps:
|
||||
|
||||
tx_hash -> [txin_pairs, txout_pairs, unconfirmed]
|
||||
hash168 -> set of all tx hashes in which the hash168 appears
|
||||
|
||||
A pair is a (hash168, value) tuple. Unconfirmed is true if any of the
|
||||
tx's txins are unconfirmed. tx hashes are hex strings.
|
||||
'''
|
||||
|
||||
def __init__(self, bp):
|
||||
super().__init__()
|
||||
self.txs = {}
|
||||
self.hash168s = defaultdict(set) # None can be a key
|
||||
self.bp = bp
|
||||
self.count = -1
|
||||
|
||||
async def update(self, hex_hashes):
|
||||
'''Update state given the current mempool to the passed set of hashes.
|
||||
|
||||
Remove transactions that are no longer in our mempool.
|
||||
Request new transactions we don't have then add to our mempool.
|
||||
'''
|
||||
hex_hashes = set(hex_hashes)
|
||||
touched = set()
|
||||
missing_utxos = []
|
||||
|
||||
initial = self.count < 0
|
||||
if initial:
|
||||
self.logger.info('beginning import of {:,d} mempool txs'
|
||||
.format(len(hex_hashes)))
|
||||
|
||||
# Remove gone items
|
||||
gone = set(self.txs).difference(hex_hashes)
|
||||
for hex_hash in gone:
|
||||
txin_pairs, txout_pairs, unconfirmed = self.txs.pop(hex_hash)
|
||||
hash168s = set(hash168 for hash168, value in txin_pairs)
|
||||
hash168s.update(hash168 for hash168, value in txout_pairs)
|
||||
for hash168 in hash168s:
|
||||
self.hash168s[hash168].remove(hex_hash)
|
||||
if not self.hash168s[hash168]:
|
||||
del self.hash168s[hash168]
|
||||
touched.update(hash168s)
|
||||
|
||||
# Get the raw transactions for the new hashes. Ignore the
|
||||
# ones the daemon no longer has (it will return None). Put
|
||||
# them into a dictionary of hex hash to deserialized tx.
|
||||
hex_hashes.difference_update(self.txs)
|
||||
raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes)
|
||||
if initial:
|
||||
self.logger.info('analysing {:,d} mempool txs'
|
||||
.format(len(raw_txs)))
|
||||
new_txs = {hex_hash: Deserializer(raw_tx).read_tx()
|
||||
for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx}
|
||||
del raw_txs, hex_hashes
|
||||
|
||||
# The mempool is unordered, so process all outputs first so
|
||||
# that looking for inputs has full info.
|
||||
script_hash168 = self.bp.coin.hash168_from_script()
|
||||
db_utxo_lookup = self.bp.db_utxo_lookup
|
||||
|
||||
def txout_pair(txout):
|
||||
return (script_hash168(txout.pk_script), txout.value)
|
||||
|
||||
for n, (hex_hash, tx) in enumerate(new_txs.items()):
|
||||
# Yield to process e.g. signals
|
||||
if n % 100 == 0:
|
||||
await asyncio.sleep(0)
|
||||
txout_pairs = [txout_pair(txout) for txout in tx.outputs]
|
||||
self.txs[hex_hash] = (None, txout_pairs, None)
|
||||
|
||||
def txin_info(txin):
|
||||
hex_hash = hash_to_str(txin.prev_hash)
|
||||
mempool_entry = self.txs.get(hex_hash)
|
||||
if mempool_entry:
|
||||
return mempool_entry[1][txin.prev_idx], True
|
||||
pair = db_utxo_lookup(txin.prev_hash, txin.prev_idx)
|
||||
return pair, False
|
||||
|
||||
if initial:
|
||||
next_log = time.time()
|
||||
self.logger.info('processed outputs, now examining inputs. '
|
||||
'This can take some time...')
|
||||
|
||||
# Now add the inputs
|
||||
for n, (hex_hash, tx) in enumerate(new_txs.items()):
|
||||
# Yield to process e.g. signals
|
||||
if n % 10 == 0:
|
||||
await asyncio.sleep(0)
|
||||
|
||||
if initial and time.time() > next_log:
|
||||
next_log = time.time() + 20
|
||||
self.logger.info('{:,d} done ({:d}%)'
|
||||
.format(n, int(n / len(new_txs) * 100)))
|
||||
|
||||
txout_pairs = self.txs[hex_hash][1]
|
||||
try:
|
||||
infos = (txin_info(txin) for txin in tx.inputs)
|
||||
txin_pairs, unconfs = zip(*infos)
|
||||
except self.bp.MissingUTXOError:
|
||||
# Drop this TX. If other mempool txs depend on it
|
||||
# it's harmless - next time the mempool is refreshed
|
||||
# they'll either be cleaned up or the UTXOs will no
|
||||
# longer be missing.
|
||||
del self.txs[hex_hash]
|
||||
continue
|
||||
self.txs[hex_hash] = (txin_pairs, txout_pairs, any(unconfs))
|
||||
|
||||
# Update touched and self.hash168s for the new tx
|
||||
for hash168, value in txin_pairs:
|
||||
self.hash168s[hash168].add(hex_hash)
|
||||
touched.add(hash168)
|
||||
for hash168, value in txout_pairs:
|
||||
self.hash168s[hash168].add(hex_hash)
|
||||
touched.add(hash168)
|
||||
|
||||
if missing_utxos:
|
||||
self.logger.info('{:,d} txs had missing UTXOs; probably the '
|
||||
'daemon is a block or two ahead of us.'
|
||||
.format(len(missing_utxos)))
|
||||
first = ', '.join('{} / {:,d}'.format(hash_to_str(txin.prev_hash),
|
||||
txin.prev_idx)
|
||||
for txin in sorted(missing_utxos)[:3])
|
||||
self.logger.info('first ones are {}'.format(first))
|
||||
|
||||
self.count += 1
|
||||
if self.count % 25 == 0 or gone:
|
||||
self.count = 0
|
||||
self.logger.info('{:,d} txs touching {:,d} addresses'
|
||||
.format(len(self.txs), len(self.hash168s)))
|
||||
|
||||
# Might include a None
|
||||
return touched
|
||||
|
||||
def transactions(self, hash168):
|
||||
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
|
||||
entries for the hash168.
|
||||
|
||||
unconfirmed is True if any txin is unconfirmed.
|
||||
'''
|
||||
for hex_hash in self.hash168s[hash168]:
|
||||
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash]
|
||||
tx_fee = (sum(v for hash168, v in txin_pairs)
|
||||
- sum(v for hash168, v in txout_pairs))
|
||||
yield (hex_hash, tx_fee, unconfirmed)
|
||||
|
||||
def value(self, hash168):
|
||||
'''Return the unconfirmed amount in the mempool for hash168.
|
||||
|
||||
Can be positive or negative.
|
||||
'''
|
||||
value = 0
|
||||
for hex_hash in self.hash168s[hash168]:
|
||||
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash]
|
||||
value -= sum(v for h168, v in txin_pairs if h168 == hash168)
|
||||
value += sum(v for h168, v in txout_pairs if h168 == hash168)
|
||||
return value
|
||||
|
||||
|
||||
class BlockProcessor(server.db.DB):
|
||||
'''Process blocks and update the DB state to match.
|
||||
|
||||
@ -335,7 +170,6 @@ class BlockProcessor(server.db.DB):
|
||||
|
||||
self.daemon = Daemon(env.daemon_url, env.debug)
|
||||
self.daemon.debug_set_height(self.height)
|
||||
self.mempool = MemPool(self)
|
||||
self.touched = set()
|
||||
self.futures = []
|
||||
|
||||
@ -423,12 +257,11 @@ class BlockProcessor(server.db.DB):
|
||||
'''Called after each deamon poll if caught up.'''
|
||||
# Caught up to daemon height. Flush everything as queries
|
||||
# are performed on the DB and not in-memory.
|
||||
self.flush(True)
|
||||
if self.first_sync:
|
||||
self.first_sync = False
|
||||
self.logger.info('{} synced to height {:,d}. DB version:'
|
||||
.format(VERSION, self.height, self.db_version))
|
||||
self.touched.update(await self.mempool.update(mempool_hashes))
|
||||
self.flush(True)
|
||||
|
||||
async def handle_chain_reorg(self):
|
||||
# First get all state on disk
|
||||
@ -1025,18 +858,3 @@ class BlockProcessor(server.db.DB):
|
||||
tx_hash = tx_hashes[tx_num - self.tx_counts[tx_height - 1]]
|
||||
|
||||
return tx_hash, tx_height
|
||||
|
||||
def mempool_transactions(self, hash168):
|
||||
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
|
||||
entries for the hash168.
|
||||
|
||||
unconfirmed is True if any txin is unconfirmed.
|
||||
'''
|
||||
return self.mempool.transactions(hash168)
|
||||
|
||||
def mempool_value(self, hash168):
|
||||
'''Return the unconfirmed amount in the mempool for hash168.
|
||||
|
||||
Can be positive or negative.
|
||||
'''
|
||||
return self.mempool.value(hash168)
|
||||
|
||||
@ -14,11 +14,12 @@ import json
|
||||
import ssl
|
||||
import time
|
||||
import traceback
|
||||
from collections import namedtuple
|
||||
from collections import defaultdict, namedtuple
|
||||
from functools import partial
|
||||
|
||||
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
|
||||
from lib.jsonrpc import JSONRPC, json_notification_payload
|
||||
from lib.tx import Deserializer
|
||||
from lib.util import LoggedClass
|
||||
from server.block_processor import BlockProcessor
|
||||
from server.daemon import DaemonError
|
||||
@ -27,19 +28,25 @@ from server.version import VERSION
|
||||
|
||||
|
||||
class BlockServer(BlockProcessor):
|
||||
'''Like BlockProcessor but also has a server manager and starts
|
||||
servers when caught up.'''
|
||||
'''Like BlockProcessor but also has a mempool and a server manager.
|
||||
|
||||
Servers are started immediately the block processor first catches
|
||||
up with the daemon.
|
||||
'''
|
||||
|
||||
def __init__(self, env):
|
||||
super().__init__(env)
|
||||
self.server_mgr = ServerManager(self, env)
|
||||
self.bs_caught_up = False
|
||||
self.mempool = MemPool(self)
|
||||
self.caught_up_yet = False
|
||||
|
||||
async def caught_up(self, mempool_hashes):
|
||||
# Call the base class to flush before doing anything else.
|
||||
await super().caught_up(mempool_hashes)
|
||||
if not self.bs_caught_up:
|
||||
if not self.caught_up_yet:
|
||||
await self.server_mgr.start_servers()
|
||||
self.bs_caught_up = True
|
||||
self.caught_up_yet = True
|
||||
self.touched.update(await self.mempool.update(mempool_hashes))
|
||||
self.server_mgr.notify(self.height, self.touched)
|
||||
|
||||
def on_cancel(self):
|
||||
@ -47,6 +54,185 @@ class BlockServer(BlockProcessor):
|
||||
self.server_mgr.stop()
|
||||
super().on_cancel()
|
||||
|
||||
def mempool_transactions(self, hash168):
|
||||
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
|
||||
entries for the hash168.
|
||||
|
||||
unconfirmed is True if any txin is unconfirmed.
|
||||
'''
|
||||
return self.mempool.transactions(hash168)
|
||||
|
||||
def mempool_value(self, hash168):
|
||||
'''Return the unconfirmed amount in the mempool for hash168.
|
||||
|
||||
Can be positive or negative.
|
||||
'''
|
||||
return self.mempool.value(hash168)
|
||||
|
||||
|
||||
class MemPool(LoggedClass):
|
||||
'''Representation of the daemon's mempool.
|
||||
|
||||
Updated regularly in caught-up state. Goal is to enable efficient
|
||||
response to the value() and transactions() calls.
|
||||
|
||||
To that end we maintain the following maps:
|
||||
|
||||
tx_hash -> [txin_pairs, txout_pairs, unconfirmed]
|
||||
hash168 -> set of all tx hashes in which the hash168 appears
|
||||
|
||||
A pair is a (hash168, value) tuple. Unconfirmed is true if any of the
|
||||
tx's txins are unconfirmed. tx hashes are hex strings.
|
||||
'''
|
||||
|
||||
def __init__(self, bp):
|
||||
super().__init__()
|
||||
self.txs = {}
|
||||
self.hash168s = defaultdict(set) # None can be a key
|
||||
self.bp = bp
|
||||
self.count = -1
|
||||
|
||||
async def update(self, hex_hashes):
|
||||
'''Update state given the current mempool to the passed set of hashes.
|
||||
|
||||
Remove transactions that are no longer in our mempool.
|
||||
Request new transactions we don't have then add to our mempool.
|
||||
'''
|
||||
hex_hashes = set(hex_hashes)
|
||||
touched = set()
|
||||
missing_utxos = []
|
||||
|
||||
initial = self.count < 0
|
||||
if initial:
|
||||
self.logger.info('beginning import of {:,d} mempool txs'
|
||||
.format(len(hex_hashes)))
|
||||
|
||||
# Remove gone items
|
||||
gone = set(self.txs).difference(hex_hashes)
|
||||
for hex_hash in gone:
|
||||
txin_pairs, txout_pairs, unconfirmed = self.txs.pop(hex_hash)
|
||||
hash168s = set(hash168 for hash168, value in txin_pairs)
|
||||
hash168s.update(hash168 for hash168, value in txout_pairs)
|
||||
for hash168 in hash168s:
|
||||
self.hash168s[hash168].remove(hex_hash)
|
||||
if not self.hash168s[hash168]:
|
||||
del self.hash168s[hash168]
|
||||
touched.update(hash168s)
|
||||
|
||||
# Get the raw transactions for the new hashes. Ignore the
|
||||
# ones the daemon no longer has (it will return None). Put
|
||||
# them into a dictionary of hex hash to deserialized tx.
|
||||
hex_hashes.difference_update(self.txs)
|
||||
raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes)
|
||||
if initial:
|
||||
self.logger.info('analysing {:,d} mempool txs'
|
||||
.format(len(raw_txs)))
|
||||
new_txs = {hex_hash: Deserializer(raw_tx).read_tx()
|
||||
for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx}
|
||||
del raw_txs, hex_hashes
|
||||
|
||||
# The mempool is unordered, so process all outputs first so
|
||||
# that looking for inputs has full info.
|
||||
script_hash168 = self.bp.coin.hash168_from_script()
|
||||
db_utxo_lookup = self.bp.db_utxo_lookup
|
||||
|
||||
def txout_pair(txout):
|
||||
return (script_hash168(txout.pk_script), txout.value)
|
||||
|
||||
for n, (hex_hash, tx) in enumerate(new_txs.items()):
|
||||
# Yield to process e.g. signals
|
||||
if n % 100 == 0:
|
||||
await asyncio.sleep(0)
|
||||
txout_pairs = [txout_pair(txout) for txout in tx.outputs]
|
||||
self.txs[hex_hash] = (None, txout_pairs, None)
|
||||
|
||||
def txin_info(txin):
|
||||
hex_hash = hash_to_str(txin.prev_hash)
|
||||
mempool_entry = self.txs.get(hex_hash)
|
||||
if mempool_entry:
|
||||
return mempool_entry[1][txin.prev_idx], True
|
||||
pair = db_utxo_lookup(txin.prev_hash, txin.prev_idx)
|
||||
return pair, False
|
||||
|
||||
if initial:
|
||||
next_log = time.time()
|
||||
self.logger.info('processed outputs, now examining inputs. '
|
||||
'This can take some time...')
|
||||
|
||||
# Now add the inputs
|
||||
for n, (hex_hash, tx) in enumerate(new_txs.items()):
|
||||
# Yield to process e.g. signals
|
||||
if n % 10 == 0:
|
||||
await asyncio.sleep(0)
|
||||
|
||||
if initial and time.time() > next_log:
|
||||
next_log = time.time() + 20
|
||||
self.logger.info('{:,d} done ({:d}%)'
|
||||
.format(n, int(n / len(new_txs) * 100)))
|
||||
|
||||
txout_pairs = self.txs[hex_hash][1]
|
||||
try:
|
||||
infos = (txin_info(txin) for txin in tx.inputs)
|
||||
txin_pairs, unconfs = zip(*infos)
|
||||
except self.bp.MissingUTXOError:
|
||||
# Drop this TX. If other mempool txs depend on it
|
||||
# it's harmless - next time the mempool is refreshed
|
||||
# they'll either be cleaned up or the UTXOs will no
|
||||
# longer be missing.
|
||||
del self.txs[hex_hash]
|
||||
continue
|
||||
self.txs[hex_hash] = (txin_pairs, txout_pairs, any(unconfs))
|
||||
|
||||
# Update touched and self.hash168s for the new tx
|
||||
for hash168, value in txin_pairs:
|
||||
self.hash168s[hash168].add(hex_hash)
|
||||
touched.add(hash168)
|
||||
for hash168, value in txout_pairs:
|
||||
self.hash168s[hash168].add(hex_hash)
|
||||
touched.add(hash168)
|
||||
|
||||
if missing_utxos:
|
||||
self.logger.info('{:,d} txs had missing UTXOs; probably the '
|
||||
'daemon is a block or two ahead of us.'
|
||||
.format(len(missing_utxos)))
|
||||
first = ', '.join('{} / {:,d}'.format(hash_to_str(txin.prev_hash),
|
||||
txin.prev_idx)
|
||||
for txin in sorted(missing_utxos)[:3])
|
||||
self.logger.info('first ones are {}'.format(first))
|
||||
|
||||
self.count += 1
|
||||
if self.count % 25 == 0 or gone:
|
||||
self.count = 0
|
||||
self.logger.info('{:,d} txs touching {:,d} addresses'
|
||||
.format(len(self.txs), len(self.hash168s)))
|
||||
|
||||
# Might include a None
|
||||
return touched
|
||||
|
||||
def transactions(self, hash168):
|
||||
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
|
||||
entries for the hash168.
|
||||
|
||||
unconfirmed is True if any txin is unconfirmed.
|
||||
'''
|
||||
for hex_hash in self.hash168s[hash168]:
|
||||
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash]
|
||||
tx_fee = (sum(v for hash168, v in txin_pairs)
|
||||
- sum(v for hash168, v in txout_pairs))
|
||||
yield (hex_hash, tx_fee, unconfirmed)
|
||||
|
||||
def value(self, hash168):
|
||||
'''Return the unconfirmed amount in the mempool for hash168.
|
||||
|
||||
Can be positive or negative.
|
||||
'''
|
||||
value = 0
|
||||
for hex_hash in self.hash168s[hash168]:
|
||||
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash]
|
||||
value -= sum(v for h168, v in txin_pairs if h168 == hash168)
|
||||
value += sum(v for h168, v in txout_pairs if h168 == hash168)
|
||||
return value
|
||||
|
||||
|
||||
class ServerManager(LoggedClass):
|
||||
'''Manages the servers.'''
|
||||
|
||||
Loading…
Reference in New Issue
Block a user