Clean up db initialization and state writing
This commit is contained in:
parent
9f9db0c7bd
commit
c7f930a18a
185
server/db.py
185
server/db.py
@ -2,6 +2,7 @@
|
||||
# and warranty status of this software.
|
||||
|
||||
import array
|
||||
import ast
|
||||
import itertools
|
||||
import os
|
||||
import struct
|
||||
@ -275,13 +276,6 @@ class UTXOCache(object):
|
||||
|
||||
class DB(object):
|
||||
|
||||
HEIGHT_KEY = b'height'
|
||||
TIP_KEY = b'tip'
|
||||
GENESIS_KEY = b'genesis'
|
||||
TX_COUNT_KEY = b'tx_count'
|
||||
FLUSH_COUNT_KEY = b'flush_count'
|
||||
WALL_TIME_KEY = b'wall_time'
|
||||
|
||||
class Error(Exception):
|
||||
pass
|
||||
|
||||
@ -289,75 +283,90 @@ class DB(object):
|
||||
self.logger = logging.getLogger('DB')
|
||||
self.logger.setLevel(logging.INFO)
|
||||
|
||||
self.coin = env.coin
|
||||
# Meta
|
||||
self.tx_hash_file_size = 16 * 1024 * 1024
|
||||
self.flush_MB = env.flush_MB
|
||||
self.next_cache_check = 0
|
||||
self.logger.info('flushing after cache reaches {:,d} MB'
|
||||
.format(self.flush_MB))
|
||||
self.last_flush = time.time()
|
||||
self.coin = env.coin
|
||||
|
||||
self.tx_counts = array.array('I')
|
||||
self.tx_hash_file_size = 4*1024*1024
|
||||
# Unflushed items. Headers and tx_hashes have one entry per block
|
||||
# Chain state (initialize to genesis in case of new DB)
|
||||
self.height = -1
|
||||
self.tx_count = 0
|
||||
self.flush_count = 0
|
||||
self.wall_time = 0
|
||||
self.tip = self.coin.GENESIS_HASH
|
||||
|
||||
# Open DB and metadata files. Record some of its state.
|
||||
self.db = self.open_db(self.coin)
|
||||
self.db_tx_count = self.tx_count
|
||||
self.db_height = self.height
|
||||
|
||||
# Caches to be flushed later. Headers and tx_hashes have one
|
||||
# entry per block
|
||||
self.headers = []
|
||||
self.tx_hashes = []
|
||||
self.history = defaultdict(partial(array.array, 'I'))
|
||||
self.history_size = 0
|
||||
|
||||
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
|
||||
try:
|
||||
self.db = self.open_db(db_name, False)
|
||||
except:
|
||||
self.db = self.open_db(db_name, True)
|
||||
self.headers_file = self.open_file('headers', True)
|
||||
self.txcount_file = self.open_file('txcount', True)
|
||||
self.init_db()
|
||||
self.logger.info('created new database {}'.format(db_name))
|
||||
else:
|
||||
self.logger.info('successfully opened database {}'.format(db_name))
|
||||
self.headers_file = self.open_file('headers')
|
||||
self.txcount_file = self.open_file('txcount')
|
||||
self.read_db()
|
||||
|
||||
self.utxo_cache = UTXOCache(self, self.db, self.coin)
|
||||
self.tx_counts = array.array('I')
|
||||
self.txcount_file.seek(0)
|
||||
self.tx_counts.fromfile(self.txcount_file, self.height + 1)
|
||||
|
||||
# Note that DB_HEIGHT is the height of the next block to be written.
|
||||
# So an empty DB has a DB_HEIGHT of 0 not -1.
|
||||
self.tx_count = self.db_tx_count
|
||||
self.height = self.db_height - 1
|
||||
self.tx_counts.fromfile(self.txcount_file, self.db_height)
|
||||
self.last_flush = time.time()
|
||||
# FIXME: this sucks and causes issues with exceptions in init_db()
|
||||
if self.tx_count == 0:
|
||||
self.flush()
|
||||
|
||||
def open_db(self, db_name, create):
|
||||
return plyvel.DB(db_name, create_if_missing=create,
|
||||
error_if_exists=create, compression=None)
|
||||
|
||||
def init_db(self):
|
||||
self.db_height = 0
|
||||
self.db_tx_count = 0
|
||||
self.flush_count = 0
|
||||
self.wall_time = 0
|
||||
self.tip = self.coin.GENESIS_HASH
|
||||
self.db.put(self.GENESIS_KEY, unhexlify(self.tip))
|
||||
|
||||
def read_db(self):
|
||||
db = self.db
|
||||
genesis_hash = hexlify(db.get(self.GENESIS_KEY))
|
||||
if genesis_hash != self.coin.GENESIS_HASH:
|
||||
raise self.Error('DB genesis hash {} does not match coin {}'
|
||||
.format(genesis_hash, self.coin.GENESIS_HASH))
|
||||
self.db_height = from_4_bytes(db.get(self.HEIGHT_KEY))
|
||||
self.db_tx_count = from_4_bytes(db.get(self.TX_COUNT_KEY))
|
||||
self.flush_count = from_4_bytes(db.get(self.FLUSH_COUNT_KEY))
|
||||
self.wall_time = from_4_bytes(db.get(self.WALL_TIME_KEY))
|
||||
self.tip = hexlify(db.get(self.TIP_KEY))
|
||||
# Log state
|
||||
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
|
||||
'flush count: {:,d} sync time: {}'
|
||||
.format(self.coin.NAME, self.coin.NET,
|
||||
self.db_height - 1, self.db_tx_count,
|
||||
self.height, self.tx_count,
|
||||
self.flush_count, self.formatted_wall_time()))
|
||||
self.logger.info('flushing after cache reaches {:,d} MB'
|
||||
.format(self.flush_MB))
|
||||
|
||||
def open_db(self, coin):
|
||||
self.headers_file = self.open_file('headers', True)
|
||||
self.txcount_file = self.open_file('txcount', True)
|
||||
is_new = self.headers_file.seek(0, 2) == 0
|
||||
if is_new != (self.txcount_file.seek(0, 2) == 0):
|
||||
raise self.Error('just one metadata file is zero-length')
|
||||
|
||||
db_name = '{}-{}'.format(coin.NAME, coin.NET)
|
||||
db = plyvel.DB(db_name, create_if_missing=is_new,
|
||||
error_if_exists=is_new, compression=None)
|
||||
if is_new:
|
||||
self.logger.info('created new database {}'.format(db_name))
|
||||
self.flush_state(db)
|
||||
else:
|
||||
self.logger.info('successfully opened database {}'.format(db_name))
|
||||
self.read_state(db)
|
||||
return db
|
||||
|
||||
def flush_state(self, batch):
|
||||
'''Flush chain state to the batch.'''
|
||||
now = time.time()
|
||||
self.wall_time += now - self.last_flush
|
||||
self.last_flush = now
|
||||
state = {
|
||||
'genesis': self.coin.GENESIS_HASH,
|
||||
'height': self.height,
|
||||
'tx_count': self.tx_count,
|
||||
'tip': self.tip,
|
||||
'flush_count': self.flush_count,
|
||||
'wall_time': self.wall_time,
|
||||
}
|
||||
batch.put(b'state', repr(state).encode('ascii'))
|
||||
|
||||
def read_state(self, db):
|
||||
state = db.get(b'state')
|
||||
state = ast.literal_eval(state.decode('ascii'))
|
||||
if state['genesis'] != self.coin.GENESIS_HASH:
|
||||
raise self.Error('DB genesis hash {} does not match coin {}'
|
||||
.format(state['genesis_hash'],
|
||||
self.coin.GENESIS_HASH))
|
||||
self.height = state['height']
|
||||
self.tx_count = state['tx_count']
|
||||
self.tip = state['tip']
|
||||
self.flush_count = state['flush_count']
|
||||
self.wall_time = state['wall_time']
|
||||
|
||||
def formatted_wall_time(self):
|
||||
wall_time = int(self.wall_time)
|
||||
@ -365,12 +374,12 @@ class DB(object):
|
||||
wall_time // 86400, (wall_time % 86400) // 3600,
|
||||
(wall_time % 3600) // 60, wall_time % 60)
|
||||
|
||||
def flush(self):
|
||||
def flush_all(self):
|
||||
'''Flush out all cached state.'''
|
||||
flush_start = time.time()
|
||||
last_flush = self.last_flush
|
||||
tx_diff = self.tx_count - self.db_tx_count
|
||||
height_diff = self.height + 1 - self.db_height
|
||||
height_diff = self.height - self.db_height
|
||||
self.logger.info('starting flush {:,d} txs and {:,d} blocks'
|
||||
.format(tx_diff, height_diff))
|
||||
|
||||
@ -386,9 +395,13 @@ class DB(object):
|
||||
self.flush_state(batch)
|
||||
self.logger.info('committing transaction...')
|
||||
|
||||
# The flush succeeded, so update our record of DB state
|
||||
self.db_tx_count = self.tx_count
|
||||
self.db_height = self.height
|
||||
|
||||
# Update and put the wall time again - otherwise we drop the
|
||||
# time it takes leveldb to commit the batch
|
||||
self.update_wall_time(self.db)
|
||||
# time it took leveldb to commit the batch
|
||||
self.flush_state(self.db)
|
||||
|
||||
flush_time = int(self.last_flush - flush_start)
|
||||
self.logger.info('flushed in {:,d}s to height {:,d} tx count {:,d} '
|
||||
@ -410,27 +423,11 @@ class DB(object):
|
||||
self.write_tx_hashes()
|
||||
os.sync()
|
||||
|
||||
def update_wall_time(self, dest):
|
||||
'''Put the wall time to dest - a DB or batch.'''
|
||||
now = time.time()
|
||||
self.wall_time += now - self.last_flush
|
||||
self.last_flush = now
|
||||
dest.put(self.WALL_TIME_KEY, to_4_bytes(int(self.wall_time)))
|
||||
|
||||
def flush_state(self, batch):
|
||||
self.db_tx_count = self.tx_count
|
||||
self.db_height = self.height + 1
|
||||
batch.put(self.HEIGHT_KEY, to_4_bytes(self.db_height))
|
||||
batch.put(self.TX_COUNT_KEY, to_4_bytes(self.db_tx_count))
|
||||
batch.put(self.FLUSH_COUNT_KEY, to_4_bytes(self.flush_count))
|
||||
batch.put(self.TIP_KEY, unhexlify(self.tip))
|
||||
self.update_wall_time(batch)
|
||||
self.flush_count += 1
|
||||
|
||||
def flush_history(self, batch):
|
||||
# Drop any None entry
|
||||
self.history.pop(None, None)
|
||||
|
||||
self.flush_count += 1
|
||||
flush_id = struct.pack('>H', self.flush_count)
|
||||
for hash168, hist in self.history.items():
|
||||
key = b'H' + hash168 + flush_id
|
||||
@ -442,9 +439,10 @@ class DB(object):
|
||||
self.history = defaultdict(partial(array.array, 'I'))
|
||||
self.history_size = 0
|
||||
|
||||
def open_file(self, filename, truncate=False, create=False):
|
||||
def open_file(self, filename, create=False):
|
||||
'''Open the file name. Return its handle.'''
|
||||
try:
|
||||
return open(filename, 'wb+' if truncate else 'rb+')
|
||||
return open(filename, 'rb+')
|
||||
except FileNotFoundError:
|
||||
if create:
|
||||
return open(filename, 'wb+')
|
||||
@ -459,14 +457,15 @@ class DB(object):
|
||||
headers = b''.join(self.headers)
|
||||
header_len = self.coin.HEADER_LEN
|
||||
assert len(headers) % header_len == 0
|
||||
self.headers_file.seek(self.db_height * header_len)
|
||||
self.headers_file.seek((self.db_height + 1) * header_len)
|
||||
self.headers_file.write(headers)
|
||||
self.headers_file.flush()
|
||||
self.headers = []
|
||||
|
||||
def write_tx_counts(self):
|
||||
self.txcount_file.seek(self.db_height * self.tx_counts.itemsize)
|
||||
self.txcount_file.write(self.tx_counts[self.db_height: self.height + 1])
|
||||
self.txcount_file.seek((self.db_height + 1) * self.tx_counts.itemsize)
|
||||
self.txcount_file.write(self.tx_counts[self.db_height + 1:
|
||||
self.height + 1])
|
||||
self.txcount_file.flush()
|
||||
|
||||
def write_tx_hashes(self):
|
||||
@ -529,7 +528,7 @@ class DB(object):
|
||||
if now > self.next_cache_check:
|
||||
self.next_cache_check = now + 60
|
||||
if self.cache_MB() > self.flush_MB:
|
||||
self.flush()
|
||||
self.flush_all()
|
||||
|
||||
def process_tx(self, tx_hash, tx):
|
||||
cache = self.utxo_cache
|
||||
@ -552,13 +551,13 @@ class DB(object):
|
||||
height = bisect_right(self.tx_counts, tx_num)
|
||||
|
||||
# Is this on disk or unflushed?
|
||||
if height >= self.db_height:
|
||||
tx_hashes = self.tx_hashes[height - self.db_height]
|
||||
if height > self.db_height:
|
||||
tx_hashes = self.tx_hashes[height - (self.db_height + 1)]
|
||||
tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]]
|
||||
else:
|
||||
file_pos = tx_num * 32
|
||||
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
|
||||
filename = 'hashes{:05d}'.format(file_num)
|
||||
filename = 'hashes{:04d}'.format(file_num)
|
||||
with self.open_file(filename) as f:
|
||||
f.seek(offset)
|
||||
tx_hash = f.read(32)
|
||||
|
||||
@ -43,7 +43,7 @@ class BlockCache(object):
|
||||
# Cache target size is in MB. Has little effect on sync time.
|
||||
self.cache_limit = 10
|
||||
self.daemon_height = 0
|
||||
self.fetched_height = db.db_height
|
||||
self.fetched_height = db.height
|
||||
# Blocks stored in reverse order. Next block is at end of list.
|
||||
self.blocks = []
|
||||
self.recent_sizes = []
|
||||
|
||||
Loading…
Reference in New Issue
Block a user