Move clean_db() to db.py
This commit is contained in:
parent
db5d0dd6cb
commit
8d253c81d3
@ -10,7 +10,6 @@
|
||||
|
||||
import array
|
||||
import asyncio
|
||||
import os
|
||||
from struct import pack, unpack
|
||||
import time
|
||||
from bisect import bisect_left
|
||||
@ -22,7 +21,6 @@ from server.version import VERSION
|
||||
from lib.hash import hash_to_str
|
||||
from lib.util import chunks, formatted_time, LoggedClass
|
||||
import server.db
|
||||
from server.storage import open_db
|
||||
|
||||
|
||||
class ChainError(Exception):
|
||||
@ -152,7 +150,6 @@ class BlockProcessor(server.db.DB):
|
||||
self.utxo_MB = env.utxo_MB
|
||||
self.hist_MB = env.hist_MB
|
||||
self.next_cache_check = 0
|
||||
self.reorg_limit = env.reorg_limit
|
||||
|
||||
# Headers and tx_hashes have one entry per block
|
||||
self.history = defaultdict(partial(array.array, 'I'))
|
||||
@ -171,14 +168,11 @@ class BlockProcessor(server.db.DB):
|
||||
self.db_deletes = []
|
||||
|
||||
# Log state
|
||||
self.logger.info('reorg limit is {:,d} blocks'
|
||||
.format(self.reorg_limit))
|
||||
if self.first_sync:
|
||||
self.logger.info('flushing UTXO cache at {:,d} MB'
|
||||
.format(self.utxo_MB))
|
||||
self.logger.info('flushing history cache at {:,d} MB'
|
||||
.format(self.hist_MB))
|
||||
self.clean_db()
|
||||
|
||||
async def main_loop(self):
|
||||
'''Main loop for block processing.'''
|
||||
@ -295,52 +289,6 @@ class BlockProcessor(server.db.DB):
|
||||
|
||||
return self.fs_block_hashes(start, count)
|
||||
|
||||
def clean_db(self):
|
||||
'''Clean out stale DB items.
|
||||
|
||||
Stale DB items are excess history flushed since the most
|
||||
recent UTXO flush (only happens on unclean shutdown), and aged
|
||||
undo information.
|
||||
'''
|
||||
if self.flush_count < self.utxo_flush_count:
|
||||
raise ChainError('DB corrupt: flush_count < utxo_flush_count')
|
||||
with self.db.write_batch() as batch:
|
||||
if self.flush_count > self.utxo_flush_count:
|
||||
self.logger.info('DB shut down uncleanly. Scanning for '
|
||||
'excess history flushes...')
|
||||
self.remove_excess_history(batch)
|
||||
self.utxo_flush_count = self.flush_count
|
||||
self.remove_stale_undo_items(batch)
|
||||
self.flush_state(batch)
|
||||
|
||||
def remove_excess_history(self, batch):
|
||||
prefix = b'H'
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=prefix):
|
||||
flush_id, = unpack('>H', key[-2:])
|
||||
if flush_id > self.utxo_flush_count:
|
||||
keys.append(key)
|
||||
|
||||
self.logger.info('deleting {:,d} history entries'
|
||||
.format(len(keys)))
|
||||
for key in keys:
|
||||
batch.delete(key)
|
||||
|
||||
def remove_stale_undo_items(self, batch):
|
||||
prefix = b'U'
|
||||
cutoff = self.db_height - self.reorg_limit
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=prefix):
|
||||
height, = unpack('>I', key[-4:])
|
||||
if height > cutoff:
|
||||
break
|
||||
keys.append(key)
|
||||
|
||||
self.logger.info('deleting {:,d} stale undo entries'
|
||||
.format(len(keys)))
|
||||
for key in keys:
|
||||
batch.delete(key)
|
||||
|
||||
def flush_state(self, batch):
|
||||
'''Flush chain state to the batch.'''
|
||||
now = time.time()
|
||||
@ -537,7 +485,7 @@ class BlockProcessor(server.db.DB):
|
||||
self.tip = self.coin.header_hash(header)
|
||||
self.height += 1
|
||||
undo_info = self.advance_txs(tx_hashes, txs, touched)
|
||||
if self.daemon.cached_height() - self.height <= self.reorg_limit:
|
||||
if self.daemon.cached_height() - self.height <= self.env.reorg_limit:
|
||||
self.write_undo_info(self.height, b''.join(undo_info))
|
||||
|
||||
def advance_txs(self, tx_hashes, txs, touched):
|
||||
|
||||
55
server/db.py
55
server/db.py
@ -47,6 +47,8 @@ class DB(LoggedClass):
|
||||
self.logger.info('switching current directory to {}'
|
||||
.format(env.db_dir))
|
||||
os.chdir(env.db_dir)
|
||||
self.logger.info('reorg limit is {:,d} blocks'
|
||||
.format(self.env.reorg_limit))
|
||||
|
||||
# Open DB and metadata files. Record some of its state.
|
||||
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
|
||||
@ -73,6 +75,7 @@ class DB(LoggedClass):
|
||||
assert self.db_tx_count == self.tx_counts[-1]
|
||||
else:
|
||||
assert self.db_tx_count == 0
|
||||
self.clean_db()
|
||||
|
||||
def read_state(self):
|
||||
if self.db.is_new:
|
||||
@ -117,6 +120,8 @@ class DB(LoggedClass):
|
||||
if self.first_sync:
|
||||
self.logger.info('sync time so far: {}'
|
||||
.format(formatted_time(self.wall_time)))
|
||||
if self.flush_count < self.utxo_flush_count:
|
||||
raise self.DBError('DB corrupt: flush_count < utxo_flush_count')
|
||||
|
||||
def write_state(self, batch):
|
||||
'''Write chain state to the batch.'''
|
||||
@ -133,6 +138,56 @@ class DB(LoggedClass):
|
||||
}
|
||||
batch.put(b'state', repr(state).encode())
|
||||
|
||||
def clean_db(self):
|
||||
'''Clean out stale DB items.
|
||||
|
||||
Stale DB items are excess history flushed since the most
|
||||
recent UTXO flush (only happens on unclean shutdown), and aged
|
||||
undo information.
|
||||
'''
|
||||
if self.flush_count > self.utxo_flush_count:
|
||||
self.utxo_flush_count = self.flush_count
|
||||
self.logger.info('DB shut down uncleanly. Scanning for '
|
||||
'excess history flushes...')
|
||||
history_keys = self.excess_history_keys()
|
||||
self.logger.info('deleting {:,d} history entries'
|
||||
.format(len(history_keys)))
|
||||
else:
|
||||
history_keys = []
|
||||
|
||||
undo_keys = self.stale_undo_keys()
|
||||
if undo_keys:
|
||||
self.logger.info('deleting {:,d} stale undo entries'
|
||||
.format(len(undo_keys)))
|
||||
|
||||
with self.db.write_batch() as batch:
|
||||
batch_delete = batch.delete
|
||||
for key in history_keys:
|
||||
batch_delete(key)
|
||||
for key in undo_keys:
|
||||
batch_delete(key)
|
||||
self.write_state(batch)
|
||||
|
||||
def excess_history_keys(self):
|
||||
prefix = b'H'
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=prefix):
|
||||
flush_id, = unpack('>H', key[-2:])
|
||||
if flush_id > self.utxo_flush_count:
|
||||
keys.append(key)
|
||||
return keys
|
||||
|
||||
def stale_undo_keys(self):
|
||||
prefix = b'U'
|
||||
cutoff = self.db_height - self.env.reorg_limit
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=prefix):
|
||||
height, = unpack('>I', key[-4:])
|
||||
if height > cutoff:
|
||||
break
|
||||
keys.append(key)
|
||||
return keys
|
||||
|
||||
def open_file(self, filename, create=False):
|
||||
'''Open the file name. Return its handle.'''
|
||||
try:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user