Merge branch 'txnum-in-h' into develop
This commit is contained in:
commit
e9820b3933
@ -353,9 +353,8 @@ class BlockProcessor(server.db.DB):
|
||||
|
||||
# UTXO cache
|
||||
self.utxo_cache = {}
|
||||
self.db_cache = {}
|
||||
self.utxo_cache_spends = 0
|
||||
self.db_deletes = 0
|
||||
self.db_deletes = []
|
||||
|
||||
# Log state
|
||||
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
|
||||
@ -540,7 +539,7 @@ class BlockProcessor(server.db.DB):
|
||||
assert self.height == self.fs_height == self.db_height
|
||||
assert not self.history
|
||||
assert not self.utxo_cache
|
||||
assert not self.db_cache
|
||||
assert not self.db_deletes
|
||||
|
||||
def flush(self, flush_utxos=False, flush_history=None):
|
||||
'''Flush out cached state.
|
||||
@ -708,15 +707,16 @@ class BlockProcessor(server.db.DB):
|
||||
# more, so we scale our already bloated object sizes.
|
||||
one_MB = int(1048576 / 1.3)
|
||||
utxo_cache_size = len(self.utxo_cache) * 187
|
||||
db_cache_size = len(self.db_cache) * 105
|
||||
db_deletes_size = len(self.db_deletes) * 61
|
||||
hist_cache_size = len(self.history) * 180 + self.history_size * 4
|
||||
tx_hash_size = (self.tx_count - self.fs_tx_count) * 74
|
||||
utxo_MB = (db_cache_size + utxo_cache_size) // one_MB
|
||||
utxo_MB = (db_deletes_size + utxo_cache_size) // one_MB
|
||||
hist_MB = (hist_cache_size + tx_hash_size) // one_MB
|
||||
|
||||
self.logger.info('UTXOs: {:,d} deletes: {:,d} '
|
||||
'UTXOs {:,d}MB hist {:,d}MB'
|
||||
.format(len(self.utxo_cache), self.db_deletes,
|
||||
.format(len(self.utxo_cache),
|
||||
len(self.db_deletes) // 2,
|
||||
utxo_MB, hist_MB))
|
||||
self.logger.info('our height: {:,d} daemon height: {:,d}'
|
||||
.format(self.height, self.daemon.cached_height()))
|
||||
@ -915,17 +915,18 @@ class BlockProcessor(server.db.DB):
|
||||
|
||||
To this end we maintain two "tables", one for each point above:
|
||||
|
||||
1. Key: b'u' + address_hash168 + tx_num + tx_idx
|
||||
1. Key: b'u' + address_hash168 + tx_idx + tx_num
|
||||
Value: the UTXO value as a 64-bit unsigned integer
|
||||
|
||||
2. Key: b'h' + compressed_tx_hash + tx_idx
|
||||
Value: [address_hash168 + tx_num]
|
||||
2. Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
||||
Value: hash168
|
||||
|
||||
The compressed tx hash is just the first few bytes of the hash of
|
||||
the tx in which the UTXO was created. As this is not unique there
|
||||
will are potential collisions when saving and looking up UTXOs;
|
||||
hence why the second table has a list as its value. The collision
|
||||
can be resolved with the tx_num. The collision rate is low (<0.1%).
|
||||
will be potential collisions so tx_num is also in the key. When
|
||||
looking up a UTXO the prefix space of the compressed hash needs to
|
||||
be searched and resolved if necessary with the tx_num. The
|
||||
collision rate is low (<0.1%).
|
||||
'''
|
||||
|
||||
def spend_utxo(self, tx_hash, tx_idx):
|
||||
@ -942,55 +943,36 @@ class BlockProcessor(server.db.DB):
|
||||
self.utxo_cache_spends += 1
|
||||
return cache_value
|
||||
|
||||
# Spend it from the DB. Read the UTXO through the cache
|
||||
# because compressed keys can collide.
|
||||
# The 4 is the COMPRESSED_TX_HASH_LEN
|
||||
db_key = b'h' + tx_hash[:4] + idx_packed
|
||||
db_value = self.db_cache_get(db_key)
|
||||
if db_value:
|
||||
# FIXME: this matches what we did previously but until we store
|
||||
# all UTXOs isn't safe
|
||||
if len(db_value) == 25:
|
||||
udb_key = b'u' + db_value + idx_packed
|
||||
utxo_value_packed = self.db.get(udb_key)
|
||||
if utxo_value_packed:
|
||||
# Remove the UTXO from both tables
|
||||
self.db_deletes += 1
|
||||
self.db_cache[db_key] = None
|
||||
self.db_cache[udb_key] = None
|
||||
return db_value + utxo_value_packed
|
||||
# Fall through to below loop for error
|
||||
# Spend it from the DB.
|
||||
|
||||
assert len(db_value) % 25 == 0
|
||||
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
||||
# Value: hash168
|
||||
prefix = b'h' + tx_hash[:4] + idx_packed
|
||||
candidates = {db_key: hash168 for db_key, hash168
|
||||
in self.db.iterator(prefix=prefix)}
|
||||
|
||||
# Find which entry, if any, the TX_HASH matches.
|
||||
for n in range(0, len(db_value), 25):
|
||||
tx_num, = unpack('<I', db_value[n + 21:n + 25])
|
||||
for hdb_key, hash168 in candidates.items():
|
||||
tx_num_packed = hdb_key[-4:]
|
||||
|
||||
if len(candidates) > 1:
|
||||
tx_num, = unpack('<I', tx_num_packed)
|
||||
hash, height = self.get_tx_hash(tx_num)
|
||||
if hash == tx_hash:
|
||||
match = db_value[n:n+25]
|
||||
udb_key = b'u' + match + idx_packed
|
||||
utxo_value_packed = self.db.get(udb_key)
|
||||
if utxo_value_packed:
|
||||
# Remove the UTXO from both tables
|
||||
self.db_deletes += 1
|
||||
self.db_cache[db_key] = db_value[:n] + db_value[n+25:]
|
||||
self.db_cache[udb_key] = None
|
||||
return match + utxo_value_packed
|
||||
if hash != tx_hash:
|
||||
continue
|
||||
|
||||
raise self.DBError('UTXO {} / {:,d} not found in "u" table'
|
||||
.format(hash_to_str(tx_hash), tx_idx))
|
||||
# Key: b'u' + address_hash168 + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
udb_key = b'u' + hash168 + hdb_key[-6:]
|
||||
utxo_value_packed = self.db.get(udb_key)
|
||||
if utxo_value_packed:
|
||||
# Remove both entries for this UTXO
|
||||
self.db_deletes.append(hdb_key)
|
||||
self.db_deletes.append(udb_key)
|
||||
return hash168 + tx_num_packed + utxo_value_packed
|
||||
|
||||
raise ChainError('UTXO {} / {:,d} not found in "h" table'
|
||||
.format(hash_to_str(tx_hash), tx_idx))
|
||||
|
||||
def db_cache_get(self, key):
|
||||
'''Fetch a 'h' value from the DB through our write cache.'''
|
||||
value = self.db_cache.get(key)
|
||||
if value:
|
||||
return value
|
||||
return self.db.get(key)
|
||||
|
||||
def flush_utxos(self, batch):
|
||||
'''Flush the cached DB writes and UTXO set to the batch.'''
|
||||
# Care is needed because the writes generated by flushing the
|
||||
@ -1004,34 +986,24 @@ class BlockProcessor(server.db.DB):
|
||||
'DB spends: {:,d}'
|
||||
.format(len(self.utxo_cache) + self.utxo_cache_spends,
|
||||
self.utxo_cache_spends,
|
||||
self.db_deletes))
|
||||
len(self.db_deletes) // 2))
|
||||
|
||||
batch_delete = batch.delete
|
||||
for key in self.db_deletes:
|
||||
batch_delete(key)
|
||||
self.db_deletes = []
|
||||
|
||||
batch_put = batch.put
|
||||
for cache_key, cache_value in self.utxo_cache.items():
|
||||
# Frist write to the hash168 lookup table
|
||||
# The 4 is the COMPRESSED_TX_HASH_LEN
|
||||
db_key = b'h' + cache_key[:4] + cache_key[-2:]
|
||||
prior_value = self.db_cache_get(db_key)
|
||||
if prior_value: # Should rarely happen
|
||||
self.db_cache[db_key] = prior_value + cache_value[:25]
|
||||
else:
|
||||
self.db_cache[db_key] = cache_value[:25]
|
||||
# suffix = tx_num + tx_idx
|
||||
hash168 = cache_value[:21]
|
||||
suffix = cache_key[-2:] + cache_value[21:25]
|
||||
batch_put(b'h' + cache_key[:4] + suffix, hash168)
|
||||
batch_put(b'u' + hash168 + suffix, cache_value[25:])
|
||||
|
||||
# Next write the UTXO table
|
||||
db_key = b'u' + cache_value[:25] + cache_key[-2:]
|
||||
self.db_cache[db_key] = cache_value[-8:]
|
||||
|
||||
# GC-ing this now can only help the levelDB write.
|
||||
self.utxo_cache = {}
|
||||
|
||||
# Now we can update to the batch.
|
||||
for key, value in self.db_cache.items():
|
||||
if value:
|
||||
batch.put(key, value)
|
||||
else: # b'' or None
|
||||
batch.delete(key)
|
||||
|
||||
self.db_cache = {}
|
||||
self.utxo_cache_spends = self.db_deletes = 0
|
||||
self.db_deletes = []
|
||||
self.utxo_cache_spends = 0
|
||||
self.utxo_flush_count = self.flush_count
|
||||
self.db_tx_count = self.tx_count
|
||||
self.db_height = self.height
|
||||
|
||||
36
server/db.py
36
server/db.py
@ -29,7 +29,7 @@ class DB(LoggedClass):
|
||||
it was shutdown uncleanly.
|
||||
'''
|
||||
|
||||
VERSIONS = [2]
|
||||
VERSIONS = [3]
|
||||
|
||||
class MissingUTXOError(Exception):
|
||||
'''Raised if a mempool tx input UTXO couldn't be found.'''
|
||||
@ -198,12 +198,14 @@ class DB(LoggedClass):
|
||||
'''
|
||||
limit = self._resolve_limit(limit)
|
||||
s_unpack = unpack
|
||||
# Key: b'u' + address_hash168 + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
prefix = b'u' + hash168
|
||||
for db_key, db_value in self.db.iterator(prefix=prefix):
|
||||
if limit == 0:
|
||||
return
|
||||
limit -= 1
|
||||
tx_num, tx_pos = s_unpack('<IH', db_key[-6:])
|
||||
tx_num, tx_pos = s_unpack('<HI', db_key[-6:])
|
||||
value, = unpack('<Q', db_value)
|
||||
tx_hash, height = self.fs_tx_hash(tx_num)
|
||||
yield UTXO(tx_num, tx_pos, tx_hash, height, value)
|
||||
@ -223,19 +225,19 @@ class DB(LoggedClass):
|
||||
'''Return (hash168, tx_num_packed) for the given TXO.
|
||||
|
||||
Both are None if not found.'''
|
||||
# The 4 is the COMPRESSED_TX_HASH_LEN
|
||||
key = b'h' + tx_hash[:4] + idx_packed
|
||||
db_value = self.db.get(key)
|
||||
if db_value:
|
||||
assert len(db_value) % 25 == 0
|
||||
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
||||
# Value: hash168
|
||||
prefix = b'h' + tx_hash[:4] + idx_packed
|
||||
|
||||
# Find which entry, if any, the TX_HASH matches.
|
||||
for n in range(0, len(db_value), 25):
|
||||
tx_num_packed = db_value[n + 21: n + 25]
|
||||
tx_num, = unpack('<I', tx_num_packed)
|
||||
hash, height = self.fs_tx_hash(tx_num)
|
||||
if hash == tx_hash:
|
||||
return db_value[n:n+21], tx_num_packed
|
||||
# Find which entry, if any, the TX_HASH matches.
|
||||
for db_key, hash168 in self.db.iterator(prefix=prefix):
|
||||
assert len(hash168) == 21
|
||||
|
||||
tx_num_packed = db_key[-4:]
|
||||
tx_num, = unpack('<I', tx_num_packed)
|
||||
hash, height = self.fs_tx_hash(tx_num)
|
||||
if hash == tx_hash:
|
||||
return hash168, tx_num_packed
|
||||
|
||||
return None, None
|
||||
|
||||
@ -249,10 +251,12 @@ class DB(LoggedClass):
|
||||
hash168, tx_num_packed = self.db_hash168(tx_hash, idx_packed)
|
||||
if not hash168:
|
||||
# This can happen when the daemon is a block ahead of us
|
||||
# and has mempool txs spending new txs in that block
|
||||
# and has mempool txs spending outputs from that new block
|
||||
raise self.MissingUTXOError
|
||||
|
||||
key = b'u' + hash168 + tx_num_packed + idx_packed
|
||||
# Key: b'u' + address_hash168 + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
key = b'u' + hash168 + idx_packed + tx_num_packed
|
||||
db_value = self.db.get(key)
|
||||
if not db_value:
|
||||
raise self.DBError('UTXO {} / {:,d} in one table only'
|
||||
|
||||
Loading…
Reference in New Issue
Block a user