Recovery from excess history flushes
This commit is contained in:
parent
cbe1ef60ca
commit
34096a02e9
75
server/db.py
75
server/db.py
@ -119,9 +119,14 @@ class UTXOCache(object):
|
||||
|
||||
hash168s.add(hash168)
|
||||
key = tx_hash + pack('<H', idx)
|
||||
if key in self.cache:
|
||||
self.logger.info('duplicate tx hash {}'
|
||||
.format(bytes(reversed(tx_hash)).hex()))
|
||||
|
||||
# Well-known duplicate coinbases from heights 91722-91880
|
||||
# that destoyed 100 BTC forever:
|
||||
# e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468
|
||||
# d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599
|
||||
#if key in self.cache:
|
||||
# self.logger.info('duplicate tx hash {}'
|
||||
# .format(bytes(reversed(tx_hash)).hex()))
|
||||
|
||||
# b''.join avoids this: https://bugs.python.org/issue13298
|
||||
self.cache[key] = b''.join(
|
||||
@ -293,8 +298,8 @@ class DB(object):
|
||||
self.coin = env.coin
|
||||
|
||||
# Chain state (initialize to genesis in case of new DB)
|
||||
self.height = -1
|
||||
self.tx_count = 0
|
||||
self.db_height = -1
|
||||
self.db_tx_count = 0
|
||||
self.flush_count = 0
|
||||
self.utxo_flush_count = 0
|
||||
self.wall_time = 0
|
||||
@ -302,8 +307,8 @@ class DB(object):
|
||||
|
||||
# Open DB and metadata files. Record some of its state.
|
||||
self.db = self.open_db(self.coin)
|
||||
self.db_tx_count = self.tx_count
|
||||
self.db_height = self.height
|
||||
self.tx_count = self.db_tx_count
|
||||
self.height = self.db_height
|
||||
|
||||
# Caches to be flushed later. Headers and tx_hashes have one
|
||||
# entry per block
|
||||
@ -313,6 +318,7 @@ class DB(object):
|
||||
self.history_size = 0
|
||||
self.utxo_cache = UTXOCache(self, self.db, self.coin)
|
||||
self.tx_counts = array.array('I')
|
||||
|
||||
self.txcount_file.seek(0)
|
||||
self.tx_counts.fromfile(self.txcount_file, self.height + 1)
|
||||
|
||||
@ -329,6 +335,7 @@ class DB(object):
|
||||
self.logger.info('flushing history cache at {:,d} MB'
|
||||
.format(self.hist_MB))
|
||||
|
||||
|
||||
def open_db(self, coin):
|
||||
db_name = '{}-{}'.format(coin.NAME, coin.NET)
|
||||
is_new = False
|
||||
@ -346,7 +353,7 @@ class DB(object):
|
||||
else:
|
||||
self.logger.info('successfully opened database {}'.format(db_name))
|
||||
self.read_state(db)
|
||||
assert self.flush_count == self.utxo_flush_count # FIXME
|
||||
self.delete_excess_history(db)
|
||||
|
||||
self.headers_file = self.open_file('headers', is_new)
|
||||
self.txcount_file = self.open_file('txcount', is_new)
|
||||
@ -360,13 +367,41 @@ class DB(object):
|
||||
raise self.Error('DB genesis hash {} does not match coin {}'
|
||||
.format(state['genesis_hash'],
|
||||
self.coin.GENESIS_HASH))
|
||||
self.height = state['height']
|
||||
self.tx_count = state['tx_count']
|
||||
self.db_height = state['height']
|
||||
self.db_tx_count = state['tx_count']
|
||||
self.tip = state['tip']
|
||||
self.flush_count = state['flush_count']
|
||||
self.utxo_flush_count = state['utxo_flush_count']
|
||||
self.wall_time = state['wall_time']
|
||||
self.last_flush_tx_count = self.tx_count
|
||||
self.last_flush_tx_count = self.db_tx_count
|
||||
self.last_flush = time.time()
|
||||
|
||||
def delete_excess_history(self, db):
|
||||
'''Clear history flushed since the most recent UTXO flush.'''
|
||||
utxo_flush_count = self.utxo_flush_count
|
||||
diff = self.flush_count - utxo_flush_count
|
||||
if diff == 0:
|
||||
return
|
||||
if diff < 0:
|
||||
raise self.Error('DB corrupt: flush_count < utxo_flush_count')
|
||||
|
||||
self.logger.info('DB not shut down cleanly. Scanning for most '
|
||||
'recent {:,d} history flushes'.format(diff))
|
||||
prefix = b'H'
|
||||
unpack = struct.unpack
|
||||
keys = []
|
||||
for key, hist in db.iterator(prefix=prefix):
|
||||
flush_id, = unpack('>H', key[-2:])
|
||||
if flush_id > self.utxo_flush_count:
|
||||
keys.append(key)
|
||||
|
||||
self.logger.info('deleting {:,d} history entries'.format(len(keys)))
|
||||
with db.write_batch(transaction=True) as batch:
|
||||
for key in keys:
|
||||
db.delete(key)
|
||||
self.utxo_flush_count = self.flush_count
|
||||
self.flush_state(batch)
|
||||
self.logger.info('deletion complete')
|
||||
|
||||
def flush_state(self, batch):
|
||||
'''Flush chain state to the batch.'''
|
||||
@ -375,8 +410,8 @@ class DB(object):
|
||||
self.last_flush = now
|
||||
state = {
|
||||
'genesis': self.coin.GENESIS_HASH,
|
||||
'height': self.height,
|
||||
'tx_count': self.tx_count,
|
||||
'height': self.db_height,
|
||||
'tx_count': self.db_tx_count,
|
||||
'tip': self.tip,
|
||||
'flush_count': self.flush_count,
|
||||
'utxo_flush_count': self.utxo_flush_count,
|
||||
@ -405,18 +440,16 @@ class DB(object):
|
||||
self.logger.info('commencing history flush')
|
||||
|
||||
with self.db.write_batch(transaction=True) as batch:
|
||||
# History first - fast and frees memory
|
||||
self.flush_history(batch)
|
||||
if flush_utxos:
|
||||
self.utxo_cache.flush(batch)
|
||||
self.utxo_flush_count = self.flush_count + 1
|
||||
self.flush_history(batch)
|
||||
self.utxo_flush_count = self.flush_count
|
||||
self.db_tx_count = self.tx_count
|
||||
self.db_height = self.height
|
||||
self.flush_state(batch)
|
||||
self.logger.info('committing transaction...')
|
||||
|
||||
# The flush succeeded, so update our record of DB state
|
||||
if flush_utxos:
|
||||
self.db_tx_count = self.tx_count
|
||||
self.db_height = self.height
|
||||
|
||||
# Update and put the wall time again - otherwise we drop the
|
||||
# time it took leveldb to commit the batch
|
||||
self.flush_state(self.db)
|
||||
@ -531,7 +564,7 @@ class DB(object):
|
||||
self.logger.info('cache stats at height {:,d} daemon height: {:,d}'
|
||||
.format(self.height, daemon_height))
|
||||
self.logger.info(' entries: UTXO: {:,d} DB: {:,d} '
|
||||
'hist count: {:,d} hist size: {:,d}'
|
||||
'hist addrs: {:,d} hist size: {:,d}'
|
||||
.format(len(self.utxo_cache.cache),
|
||||
len(self.utxo_cache.db_cache),
|
||||
len(self.history),
|
||||
|
||||
Loading…
Reference in New Issue
Block a user