Separate history management into its own object
- the object also manages unflushed history - the history DB has its own version, starting at 0 This is the first step to making history management into a separate service and to larger block sizes. The next step is an improved history format, which I hope will both save space and be a solution to issue #185. The DB should be able to upgrade in-place without re-syncing the chain.
This commit is contained in:
parent
5d22e27d52
commit
5e1ed3ffa6
@ -49,16 +49,19 @@ def compact_history():
|
||||
db = DB(env)
|
||||
|
||||
assert not db.first_sync
|
||||
history = db.history
|
||||
# Continue where we left off, if interrupted
|
||||
if db.comp_cursor == -1:
|
||||
db.comp_cursor = 0
|
||||
if history.comp_cursor == -1:
|
||||
history.comp_cursor = 0
|
||||
|
||||
db.comp_flush_count = max(db.comp_flush_count, 1)
|
||||
history.comp_flush_count = max(history.comp_flush_count, 1)
|
||||
limit = 8 * 1000 * 1000
|
||||
|
||||
while db.comp_cursor != -1:
|
||||
db._compact_history(limit)
|
||||
while history.comp_cursor != -1:
|
||||
history._compact_history(limit)
|
||||
|
||||
# When completed also update the UTXO flush count
|
||||
db.set_flush_count(history.flush_count)
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
@ -128,6 +128,13 @@ def chunks(items, size):
|
||||
yield items[i: i + size]
|
||||
|
||||
|
||||
def resolve_limit(limit):
|
||||
if limit is None:
|
||||
return -1
|
||||
assert isinstance(limit, int) and limit >= 0
|
||||
return limit
|
||||
|
||||
|
||||
def bytes_to_int(be_bytes):
|
||||
'''Interprets a big-endian sequence of bytes as an integer'''
|
||||
return int.from_bytes(be_bytes, 'big')
|
||||
|
||||
@ -145,7 +145,7 @@ class BlockProcessor(server.db.DB):
|
||||
|
||||
# An incomplete compaction needs to be cancelled otherwise
|
||||
# restarting it will corrupt the history
|
||||
self.cancel_history_compaction()
|
||||
self.history.cancel_compaction()
|
||||
|
||||
self.daemon = daemon
|
||||
self.controller = controller
|
||||
@ -171,8 +171,6 @@ class BlockProcessor(server.db.DB):
|
||||
self.headers = []
|
||||
self.tx_hashes = []
|
||||
self.undo_infos = []
|
||||
self.history = defaultdict(partial(array.array, 'I'))
|
||||
self.history_size = 0
|
||||
|
||||
# UTXO cache
|
||||
self.utxo_cache = {}
|
||||
@ -348,9 +346,9 @@ class BlockProcessor(server.db.DB):
|
||||
assert self.tx_count == self.fs_tx_count == self.db_tx_count
|
||||
assert self.height == self.fs_height == self.db_height
|
||||
assert not self.undo_infos
|
||||
assert not self.history
|
||||
assert not self.utxo_cache
|
||||
assert not self.db_deletes
|
||||
self.history.assert_flushed()
|
||||
|
||||
def flush(self, flush_utxos=False):
|
||||
'''Flush out cached state.
|
||||
@ -372,12 +370,10 @@ class BlockProcessor(server.db.DB):
|
||||
.format(fs_end - flush_start))
|
||||
|
||||
# History next - it's fast and frees memory
|
||||
self.flush_history(self.history)
|
||||
hashX_count = self.history.flush()
|
||||
if self.utxo_db.for_sync:
|
||||
self.logger.info('flushed history in {:.1f}s for {:,d} addrs'
|
||||
.format(time.time() - fs_end, len(self.history)))
|
||||
self.history = defaultdict(partial(array.array, 'I'))
|
||||
self.history_size = 0
|
||||
.format(time.time() - fs_end, hashX_count))
|
||||
|
||||
# Flush state last as it reads the wall time.
|
||||
with self.utxo_db.write_batch() as batch:
|
||||
@ -390,7 +386,7 @@ class BlockProcessor(server.db.DB):
|
||||
self.flush_state(self.utxo_db)
|
||||
|
||||
self.logger.info('flush #{:,d} took {:.1f}s. Height {:,d} txs: {:,d}'
|
||||
.format(self.flush_count,
|
||||
.format(self.history.flush_count,
|
||||
self.last_flush - flush_start,
|
||||
self.height, self.tx_count))
|
||||
|
||||
@ -437,7 +433,7 @@ class BlockProcessor(server.db.DB):
|
||||
height.
|
||||
'''
|
||||
assert self.height < self.db_height
|
||||
assert not self.history
|
||||
self.history.assert_flushed()
|
||||
|
||||
flush_start = time.time()
|
||||
|
||||
@ -450,7 +446,7 @@ class BlockProcessor(server.db.DB):
|
||||
# Backup history. self.touched can include other addresses
|
||||
# which is harmless, but remove None.
|
||||
self.touched.discard(None)
|
||||
nremoves = self.backup_history(self.touched)
|
||||
nremoves = self.history.backup(self.touched, self.tx_count)
|
||||
self.logger.info('backing up removed {:,d} history entries'
|
||||
.format(nremoves))
|
||||
|
||||
@ -461,7 +457,7 @@ class BlockProcessor(server.db.DB):
|
||||
|
||||
self.logger.info('backup flush #{:,d} took {:.1f}s. '
|
||||
'Height {:,d} txs: {:,d}'
|
||||
.format(self.flush_count,
|
||||
.format(self.history.flush_count,
|
||||
self.last_flush - flush_start,
|
||||
self.height, self.tx_count))
|
||||
|
||||
@ -472,7 +468,7 @@ class BlockProcessor(server.db.DB):
|
||||
one_MB = 1000*1000
|
||||
utxo_cache_size = len(self.utxo_cache) * 205
|
||||
db_deletes_size = len(self.db_deletes) * 57
|
||||
hist_cache_size = len(self.history) * 180 + self.history_size * 4
|
||||
hist_cache_size = self.history.unflushed_memsize()
|
||||
# Roughly ntxs * 32 + nblocks * 42
|
||||
tx_hash_size = ((self.tx_count - self.fs_tx_count) * 32
|
||||
+ (self.height - self.fs_height) * 42)
|
||||
@ -522,19 +518,19 @@ class BlockProcessor(server.db.DB):
|
||||
|
||||
# Use local vars for speed in the loops
|
||||
undo_info = []
|
||||
history = self.history
|
||||
history_size = self.history_size
|
||||
tx_num = self.tx_count
|
||||
script_hashX = self.coin.hashX_from_script
|
||||
s_pack = pack
|
||||
put_utxo = self.utxo_cache.__setitem__
|
||||
spend_utxo = self.spend_utxo
|
||||
undo_info_append = undo_info.append
|
||||
touched = self.touched
|
||||
update_touched = self.touched.update
|
||||
hashXs_by_tx = []
|
||||
append_hashXs = hashXs_by_tx.append
|
||||
|
||||
for tx, tx_hash in txs:
|
||||
hashXs = set()
|
||||
add_hashX = hashXs.add
|
||||
hashXs = []
|
||||
append_hashX = hashXs.append
|
||||
tx_numb = s_pack('<I', tx_num)
|
||||
|
||||
# Spend the inputs
|
||||
@ -542,26 +538,25 @@ class BlockProcessor(server.db.DB):
|
||||
for txin in tx.inputs:
|
||||
cache_value = spend_utxo(txin.prev_hash, txin.prev_idx)
|
||||
undo_info_append(cache_value)
|
||||
add_hashX(cache_value[:-12])
|
||||
append_hashX(cache_value[:-12])
|
||||
|
||||
# Add the new UTXOs
|
||||
for idx, txout in enumerate(tx.outputs):
|
||||
# Get the hashX. Ignore unspendable outputs
|
||||
hashX = script_hashX(txout.pk_script)
|
||||
if hashX:
|
||||
add_hashX(hashX)
|
||||
append_hashX(hashX)
|
||||
put_utxo(tx_hash + s_pack('<H', idx),
|
||||
hashX + tx_numb + s_pack('<Q', txout.value))
|
||||
|
||||
for hashX in hashXs:
|
||||
history[hashX].append(tx_num)
|
||||
history_size += len(hashXs)
|
||||
touched.update(hashXs)
|
||||
append_hashXs(hashXs)
|
||||
update_touched(hashXs)
|
||||
tx_num += 1
|
||||
|
||||
self.history.add_unflushed(hashXs_by_tx, self.tx_count)
|
||||
|
||||
self.tx_count = tx_num
|
||||
self.tx_counts.append(tx_num)
|
||||
self.history_size = history_size
|
||||
|
||||
return undo_info
|
||||
|
||||
@ -764,7 +759,7 @@ class BlockProcessor(server.db.DB):
|
||||
utxo_cache_len, delete_count,
|
||||
time.time() - flush_start))
|
||||
|
||||
self.utxo_flush_count = self.flush_count
|
||||
self.utxo_flush_count = self.history.flush_count
|
||||
self.db_tx_count = self.tx_count
|
||||
self.db_height = self.height
|
||||
self.db_tip = self.tip
|
||||
|
||||
317
server/db.py
317
server/db.py
@ -14,12 +14,13 @@ import ast
|
||||
import logging
|
||||
import os
|
||||
from struct import pack, unpack
|
||||
from bisect import bisect_left, bisect_right
|
||||
from bisect import bisect_right
|
||||
from collections import namedtuple
|
||||
|
||||
import lib.util as util
|
||||
from lib.hash import hash_to_str, HASHX_LEN
|
||||
from server.storage import db_class
|
||||
from server.history import History
|
||||
|
||||
|
||||
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
|
||||
@ -60,12 +61,9 @@ class DB(object):
|
||||
self.db_class = db_class(self.env.db_engine)
|
||||
self.logger.info('using {} for DB backend'.format(self.env.db_engine))
|
||||
|
||||
# For history compaction
|
||||
self.max_hist_row_entries = 12500
|
||||
|
||||
self.history = History()
|
||||
self.utxo_db = None
|
||||
self.open_dbs()
|
||||
self.clean_db()
|
||||
|
||||
self.logger.info('reorg limit is {:,d} blocks'
|
||||
.format(self.env.reorg_limit))
|
||||
@ -109,11 +107,10 @@ class DB(object):
|
||||
return
|
||||
log_reason('closing DB to re-open', for_sync)
|
||||
self.utxo_db.close()
|
||||
self.hist_db.close()
|
||||
self.history.close_db()
|
||||
|
||||
# Open DB and metadata files. Record some of its state.
|
||||
self.utxo_db = self.db_class('utxo', for_sync)
|
||||
self.hist_db = self.db_class('hist', for_sync)
|
||||
if self.utxo_db.is_new:
|
||||
self.logger.info('created new database')
|
||||
self.logger.info('creating metadata directory')
|
||||
@ -128,7 +125,7 @@ class DB(object):
|
||||
if self.first_sync == self.utxo_db.for_sync:
|
||||
break
|
||||
|
||||
self.read_history_state()
|
||||
self.history.open_db(self.db_class, for_sync)
|
||||
|
||||
self.logger.info('DB version: {:d}'.format(self.db_version))
|
||||
self.logger.info('coin: {}'.format(self.coin.NAME))
|
||||
@ -136,7 +133,6 @@ class DB(object):
|
||||
self.logger.info('height: {:,d}'.format(self.db_height))
|
||||
self.logger.info('tip: {}'.format(hash_to_str(self.db_tip)))
|
||||
self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
|
||||
self.logger.info('flush count: {:,d}'.format(self.flush_count))
|
||||
if self.first_sync:
|
||||
self.logger.info('sync time so far: {}'
|
||||
.format(util.formatted_time(self.wall_time)))
|
||||
@ -148,12 +144,7 @@ class DB(object):
|
||||
recent UTXO flush (only happens on unclean shutdown), and aged
|
||||
undo information.
|
||||
'''
|
||||
if self.flush_count < self.utxo_flush_count:
|
||||
# Might happen at end of compaction as both DBs cannot be
|
||||
# updated atomically
|
||||
self.utxo_flush_count = self.flush_count
|
||||
if self.flush_count > self.utxo_flush_count:
|
||||
self.clear_excess_history(self.utxo_flush_count)
|
||||
self.utxo_flush_count = self.history.clear_excess(self.utxo_flush_count)
|
||||
self.clear_excess_undo_info()
|
||||
|
||||
def fs_update_header_offsets(self, offset_start, height_start, headers):
|
||||
@ -253,12 +244,15 @@ class DB(object):
|
||||
|
||||
return [self.coin.header_hash(header) for header in headers]
|
||||
|
||||
@staticmethod
|
||||
def _resolve_limit(limit):
|
||||
if limit is None:
|
||||
return -1
|
||||
assert isinstance(limit, int) and limit >= 0
|
||||
return limit
|
||||
def get_history(self, hashX, limit=1000):
|
||||
'''Generator that returns an unpruned, sorted list of (tx_hash,
|
||||
height) tuples of confirmed transactions that touched the address,
|
||||
earliest in the blockchain first. Includes both spending and
|
||||
receiving transactions. By default yields at most 1000 entries.
|
||||
Set limit to None to get them all.
|
||||
'''
|
||||
for tx_num in self.history.get_txnums(hashX, limit):
|
||||
yield self.fs_tx_hash(tx_num)
|
||||
|
||||
# -- Undo information
|
||||
|
||||
@ -315,8 +309,8 @@ class DB(object):
|
||||
raise self.DBError('failed reading state from DB')
|
||||
self.db_version = state['db_version']
|
||||
if self.db_version not in self.DB_VERSIONS:
|
||||
raise self.DBError('your DB version is {} but this software '
|
||||
'only handles versions {}'
|
||||
raise self.DBError('your UTXO DB version is {} but this '
|
||||
'software only handles versions {}'
|
||||
.format(self.db_version, self.DB_VERSIONS))
|
||||
# backwards compat
|
||||
genesis_hash = state['genesis']
|
||||
@ -347,6 +341,11 @@ class DB(object):
|
||||
}
|
||||
batch.put(b'state', repr(state).encode())
|
||||
|
||||
def set_flush_count(self, count):
|
||||
self.utxo_flush_count = count
|
||||
with self.utxo_db.write_batch() as batch:
|
||||
self.write_utxo_state(batch)
|
||||
|
||||
def get_balance(self, hashX):
|
||||
'''Returns the confirmed balance of an address.'''
|
||||
return sum(utxo.value for utxo in self.get_utxos(hashX, limit=None))
|
||||
@ -356,7 +355,7 @@ class DB(object):
|
||||
particular order. By default yields at most 1000 entries.
|
||||
Set limit to None to get them all.
|
||||
'''
|
||||
limit = self._resolve_limit(limit)
|
||||
limit = util.resolve_limit(limit)
|
||||
s_unpack = unpack
|
||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
@ -410,273 +409,3 @@ class DB(object):
|
||||
return hashX, tx_num_packed
|
||||
|
||||
return None, None
|
||||
|
||||
# -- History database
|
||||
|
||||
def clear_excess_history(self, flush_count):
|
||||
self.logger.info('DB shut down uncleanly. Scanning for '
|
||||
'excess history flushes...')
|
||||
|
||||
keys = []
|
||||
for key, hist in self.hist_db.iterator(prefix=b''):
|
||||
flush_id, = unpack('>H', key[-2:])
|
||||
if flush_id > flush_count:
|
||||
keys.append(key)
|
||||
|
||||
self.logger.info('deleting {:,d} history entries'.format(len(keys)))
|
||||
|
||||
self.flush_count = flush_count
|
||||
with self.hist_db.write_batch() as batch:
|
||||
for key in keys:
|
||||
batch.delete(key)
|
||||
self.write_history_state(batch)
|
||||
|
||||
self.logger.info('deleted excess history entries')
|
||||
|
||||
def write_history_state(self, batch):
|
||||
'''Write state to hist_db.'''
|
||||
state = {
|
||||
'flush_count': self.flush_count,
|
||||
'comp_flush_count': self.comp_flush_count,
|
||||
'comp_cursor': self.comp_cursor,
|
||||
}
|
||||
# History entries are not prefixed; the suffix \0\0 ensures we
|
||||
# look similar to other entries and aren't interfered with
|
||||
batch.put(b'state\0\0', repr(state).encode())
|
||||
|
||||
def read_history_state(self):
|
||||
state = self.hist_db.get(b'state\0\0')
|
||||
if state:
|
||||
state = ast.literal_eval(state.decode())
|
||||
if not isinstance(state, dict):
|
||||
raise self.DBError('failed reading state from history DB')
|
||||
self.flush_count = state['flush_count']
|
||||
self.comp_flush_count = state.get('comp_flush_count', -1)
|
||||
self.comp_cursor = state.get('comp_cursor', -1)
|
||||
else:
|
||||
self.flush_count = 0
|
||||
self.comp_flush_count = -1
|
||||
self.comp_cursor = -1
|
||||
|
||||
def flush_history(self, history):
|
||||
self.flush_count += 1
|
||||
flush_id = pack('>H', self.flush_count)
|
||||
|
||||
with self.hist_db.write_batch() as batch:
|
||||
for hashX in sorted(history):
|
||||
key = hashX + flush_id
|
||||
batch.put(key, history[hashX].tobytes())
|
||||
self.write_history_state(batch)
|
||||
|
||||
def backup_history(self, hashXs):
|
||||
# Not certain this is needed, but it doesn't hurt
|
||||
self.flush_count += 1
|
||||
nremoves = 0
|
||||
|
||||
with self.hist_db.write_batch() as batch:
|
||||
for hashX in sorted(hashXs):
|
||||
deletes = []
|
||||
puts = {}
|
||||
for key, hist in self.hist_db.iterator(prefix=hashX,
|
||||
reverse=True):
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
# Remove all history entries >= self.tx_count
|
||||
idx = bisect_left(a, self.tx_count)
|
||||
nremoves += len(a) - idx
|
||||
if idx > 0:
|
||||
puts[key] = a[:idx].tobytes()
|
||||
break
|
||||
deletes.append(key)
|
||||
|
||||
for key in deletes:
|
||||
batch.delete(key)
|
||||
for key, value in puts.items():
|
||||
batch.put(key, value)
|
||||
self.write_history_state(batch)
|
||||
|
||||
return nremoves
|
||||
|
||||
def get_history_txnums(self, hashX, limit=1000):
|
||||
'''Generator that returns an unpruned, sorted list of tx_nums in the
|
||||
history of a hashX. Includes both spending and receiving
|
||||
transactions. By default yields at most 1000 entries. Set
|
||||
limit to None to get them all. '''
|
||||
limit = self._resolve_limit(limit)
|
||||
for key, hist in self.hist_db.iterator(prefix=hashX):
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
for tx_num in a:
|
||||
if limit == 0:
|
||||
return
|
||||
yield tx_num
|
||||
limit -= 1
|
||||
|
||||
def get_history(self, hashX, limit=1000):
|
||||
'''Generator that returns an unpruned, sorted list of (tx_hash,
|
||||
height) tuples of confirmed transactions that touched the address,
|
||||
earliest in the blockchain first. Includes both spending and
|
||||
receiving transactions. By default yields at most 1000 entries.
|
||||
Set limit to None to get them all.
|
||||
'''
|
||||
for tx_num in self.get_history_txnums(hashX, limit):
|
||||
yield self.fs_tx_hash(tx_num)
|
||||
|
||||
#
|
||||
# History compaction
|
||||
#
|
||||
|
||||
# comp_cursor is a cursor into compaction progress.
|
||||
# -1: no compaction in progress
|
||||
# 0-65535: Compaction in progress; all prefixes < comp_cursor have
|
||||
# been compacted, and later ones have not.
|
||||
# 65536: compaction complete in-memory but not flushed
|
||||
#
|
||||
# comp_flush_count applies during compaction, and is a flush count
|
||||
# for history with prefix < comp_cursor. flush_count applies
|
||||
# to still uncompacted history. It is -1 when no compaction is
|
||||
# taking place. Key suffixes up to and including comp_flush_count
|
||||
# are used, so a parallel history flush must first increment this
|
||||
#
|
||||
# When compaction is complete and the final flush takes place,
|
||||
# flush_count is reset to comp_flush_count, and comp_flush_count to -1
|
||||
|
||||
def _flush_compaction(self, cursor, write_items, keys_to_delete):
|
||||
'''Flush a single compaction pass as a batch.'''
|
||||
# Update compaction state
|
||||
if cursor == 65536:
|
||||
self.flush_count = self.comp_flush_count
|
||||
self.comp_cursor = -1
|
||||
self.comp_flush_count = -1
|
||||
else:
|
||||
self.comp_cursor = cursor
|
||||
|
||||
# History DB. Flush compacted history and updated state
|
||||
with self.hist_db.write_batch() as batch:
|
||||
# Important: delete first! The keyspace may overlap.
|
||||
for key in keys_to_delete:
|
||||
batch.delete(key)
|
||||
for key, value in write_items:
|
||||
batch.put(key, value)
|
||||
self.write_history_state(batch)
|
||||
|
||||
# If compaction was completed also update the UTXO flush count
|
||||
if cursor == 65536:
|
||||
self.utxo_flush_count = self.flush_count
|
||||
with self.utxo_db.write_batch() as batch:
|
||||
self.write_utxo_state(batch)
|
||||
|
||||
def _compact_hashX(self, hashX, hist_map, hist_list,
|
||||
write_items, keys_to_delete):
|
||||
'''Compres history for a hashX. hist_list is an ordered list of
|
||||
the histories to be compressed.'''
|
||||
# History entries (tx numbers) are 4 bytes each. Distribute
|
||||
# over rows of up to 50KB in size. A fixed row size means
|
||||
# future compactions will not need to update the first N - 1
|
||||
# rows.
|
||||
max_row_size = self.max_hist_row_entries * 4
|
||||
full_hist = b''.join(hist_list)
|
||||
nrows = (len(full_hist) + max_row_size - 1) // max_row_size
|
||||
if nrows > 4:
|
||||
self.logger.info('hashX {} is large: {:,d} entries across '
|
||||
'{:,d} rows'
|
||||
.format(hash_to_str(hashX), len(full_hist) // 4,
|
||||
nrows))
|
||||
|
||||
# Find what history needs to be written, and what keys need to
|
||||
# be deleted. Start by assuming all keys are to be deleted,
|
||||
# and then remove those that are the same on-disk as when
|
||||
# compacted.
|
||||
write_size = 0
|
||||
keys_to_delete.update(hist_map)
|
||||
for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
|
||||
key = hashX + pack('>H', n)
|
||||
if hist_map.get(key) == chunk:
|
||||
keys_to_delete.remove(key)
|
||||
else:
|
||||
write_items.append((key, chunk))
|
||||
write_size += len(chunk)
|
||||
|
||||
assert n + 1 == nrows
|
||||
self.comp_flush_count = max(self.comp_flush_count, n)
|
||||
|
||||
return write_size
|
||||
|
||||
def _compact_prefix(self, prefix, write_items, keys_to_delete):
|
||||
'''Compact all history entries for hashXs beginning with the
|
||||
given prefix. Update keys_to_delete and write.'''
|
||||
prior_hashX = None
|
||||
hist_map = {}
|
||||
hist_list = []
|
||||
|
||||
key_len = HASHX_LEN + 2
|
||||
write_size = 0
|
||||
for key, hist in self.hist_db.iterator(prefix=prefix):
|
||||
# Ignore non-history entries
|
||||
if len(key) != key_len:
|
||||
continue
|
||||
hashX = key[:-2]
|
||||
if hashX != prior_hashX and prior_hashX:
|
||||
write_size += self._compact_hashX(prior_hashX, hist_map,
|
||||
hist_list, write_items,
|
||||
keys_to_delete)
|
||||
hist_map.clear()
|
||||
hist_list.clear()
|
||||
prior_hashX = hashX
|
||||
hist_map[key] = hist
|
||||
hist_list.append(hist)
|
||||
|
||||
if prior_hashX:
|
||||
write_size += self._compact_hashX(prior_hashX, hist_map, hist_list,
|
||||
write_items, keys_to_delete)
|
||||
return write_size
|
||||
|
||||
def _compact_history(self, limit):
|
||||
'''Inner loop of history compaction. Loops until limit bytes have
|
||||
been processed.
|
||||
'''
|
||||
keys_to_delete = set()
|
||||
write_items = [] # A list of (key, value) pairs
|
||||
write_size = 0
|
||||
|
||||
# Loop over 2-byte prefixes
|
||||
cursor = self.comp_cursor
|
||||
while write_size < limit and cursor < 65536:
|
||||
prefix = pack('>H', cursor)
|
||||
write_size += self._compact_prefix(prefix, write_items,
|
||||
keys_to_delete)
|
||||
cursor += 1
|
||||
|
||||
max_rows = self.comp_flush_count + 1
|
||||
self._flush_compaction(cursor, write_items, keys_to_delete)
|
||||
|
||||
self.logger.info('history compaction: wrote {:,d} rows ({:.1f} MB), '
|
||||
'removed {:,d} rows, largest: {:,d}, {:.1f}% complete'
|
||||
.format(len(write_items), write_size / 1000000,
|
||||
len(keys_to_delete), max_rows,
|
||||
100 * cursor / 65536))
|
||||
return write_size
|
||||
|
||||
async def compact_history(self, loop):
|
||||
'''Start a background history compaction and reset the flush count if
|
||||
its getting high.
|
||||
'''
|
||||
# Do nothing if during initial sync or if a compaction hasn't
|
||||
# been initiated
|
||||
if self.first_sync or self.comp_cursor == -1:
|
||||
return
|
||||
|
||||
self.comp_flush_count = max(self.comp_flush_count, 1)
|
||||
limit = 50 * 1000 * 1000
|
||||
|
||||
while self.comp_cursor != -1:
|
||||
if self.semaphore.locked:
|
||||
self.logger.info('compact_history: waiting on semaphore...')
|
||||
async with self.semaphore:
|
||||
await loop.run_in_executor(None, self._compact_history, limit)
|
||||
|
||||
def cancel_history_compaction(self):
|
||||
if self.comp_cursor != -1:
|
||||
self.logger.warning('cancelling in-progress history compaction')
|
||||
self.comp_flush_count = -1
|
||||
self.comp_cursor = -1
|
||||
|
||||
311
server/history.py
Normal file
311
server/history.py
Normal file
@ -0,0 +1,311 @@
|
||||
# Copyright (c) 2016-2018, Neil Booth
|
||||
# Copyright (c) 2017, the ElectrumX authors
|
||||
#
|
||||
# All rights reserved.
|
||||
#
|
||||
# See the file "LICENCE" for information about the copyright
|
||||
# and warranty status of this software.
|
||||
|
||||
'''History by script hash (address).'''
|
||||
|
||||
import array
|
||||
import ast
|
||||
import bisect
|
||||
from collections import defaultdict
|
||||
from functools import partial
|
||||
import logging
|
||||
from struct import pack, unpack
|
||||
|
||||
import lib.util as util
|
||||
from lib.hash import hash_to_str, HASHX_LEN
|
||||
|
||||
|
||||
class History(object):
|
||||
|
||||
DB_VERSIONS = [0]
|
||||
|
||||
def __init__(self):
|
||||
self.logger = logging.getLogger(self.__class__.__name__)
|
||||
# For history compaction
|
||||
self.max_hist_row_entries = 12500
|
||||
self.unflushed = defaultdict(partial(array.array, 'I'))
|
||||
self.unflushed_count = 0
|
||||
|
||||
def open_db(self, db_class, for_sync):
|
||||
self.db = db_class('hist', for_sync)
|
||||
self.read_state()
|
||||
|
||||
def close_db(self):
|
||||
self.db.close()
|
||||
|
||||
def read_state(self):
|
||||
state = self.db.get(b'state\0\0')
|
||||
if state:
|
||||
state = ast.literal_eval(state.decode())
|
||||
if not isinstance(state, dict):
|
||||
raise RuntimeError('failed reading state from history DB')
|
||||
self.flush_count = state['flush_count']
|
||||
self.comp_flush_count = state.get('comp_flush_count', -1)
|
||||
self.comp_cursor = state.get('comp_cursor', -1)
|
||||
self.db_version = state.get('db_version', 0)
|
||||
else:
|
||||
self.flush_count = 0
|
||||
self.comp_flush_count = -1
|
||||
self.comp_cursor = -1
|
||||
self.db_version = max(self.DB_VERSIONS)
|
||||
|
||||
self.logger.info(f'flush count: {self.flush_count:,d}')
|
||||
self.logger.info(f'history DB version: {self.db_version}')
|
||||
if self.db_version not in self.DB_VERSIONS:
|
||||
msg = f'this software only handles DB versions {self.DB_VERSIONS}'
|
||||
self.logger.error(msg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
def clear_excess(self, flush_count):
|
||||
# < might happen at end of compaction as both DBs cannot be
|
||||
# updated atomically
|
||||
if self.flush_count <= flush_count:
|
||||
return self.flush_count
|
||||
|
||||
self.logger.info('DB shut down uncleanly. Scanning for '
|
||||
'excess history flushes...')
|
||||
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=b''):
|
||||
flush_id, = unpack('>H', key[-2:])
|
||||
if flush_id > flush_count:
|
||||
keys.append(key)
|
||||
|
||||
self.logger.info('deleting {:,d} history entries'.format(len(keys)))
|
||||
|
||||
self.flush_count = flush_count
|
||||
with self.db.write_batch() as batch:
|
||||
for key in keys:
|
||||
batch.delete(key)
|
||||
self.write_state(batch)
|
||||
|
||||
self.logger.info('deleted excess history entries')
|
||||
return self.flush_count
|
||||
|
||||
def write_state(self, batch):
|
||||
'''Write state to the history DB.'''
|
||||
state = {
|
||||
'flush_count': self.flush_count,
|
||||
'comp_flush_count': self.comp_flush_count,
|
||||
'comp_cursor': self.comp_cursor,
|
||||
'db_version': self.db_version,
|
||||
}
|
||||
# History entries are not prefixed; the suffix \0\0 ensures we
|
||||
# look similar to other entries and aren't interfered with
|
||||
batch.put(b'state\0\0', repr(state).encode())
|
||||
|
||||
def add_unflushed(self, hashXs_by_tx, first_tx_num):
|
||||
unflushed = self.unflushed
|
||||
count = 0
|
||||
for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
|
||||
hashXs = set(hashXs)
|
||||
for hashX in hashXs:
|
||||
unflushed[hashX].append(tx_num)
|
||||
count += len(hashXs)
|
||||
self.unflushed_count += count
|
||||
|
||||
def unflushed_memsize(self):
|
||||
return len(self.unflushed) * 180 + self.unflushed_count * 4
|
||||
|
||||
def assert_flushed(self):
|
||||
assert not self.unflushed
|
||||
|
||||
def flush(self):
|
||||
self.flush_count += 1
|
||||
flush_id = pack('>H', self.flush_count)
|
||||
unflushed = self.unflushed
|
||||
|
||||
with self.db.write_batch() as batch:
|
||||
for hashX in sorted(unflushed):
|
||||
key = hashX + flush_id
|
||||
batch.put(key, unflushed[hashX].tobytes())
|
||||
self.write_state(batch)
|
||||
|
||||
count = len(unflushed)
|
||||
unflushed.clear()
|
||||
self.unflushed_count = 0
|
||||
return count
|
||||
|
||||
def backup(self, hashXs, tx_count):
|
||||
# Not certain this is needed, but it doesn't hurt
|
||||
self.flush_count += 1
|
||||
nremoves = 0
|
||||
bisect_left = bisect.bisect_left
|
||||
|
||||
with self.db.write_batch() as batch:
|
||||
for hashX in sorted(hashXs):
|
||||
deletes = []
|
||||
puts = {}
|
||||
for key, hist in self.db.iterator(prefix=hashX, reverse=True):
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
# Remove all history entries >= tx_count
|
||||
idx = bisect_left(a, tx_count)
|
||||
nremoves += len(a) - idx
|
||||
if idx > 0:
|
||||
puts[key] = a[:idx].tobytes()
|
||||
break
|
||||
deletes.append(key)
|
||||
|
||||
for key in deletes:
|
||||
batch.delete(key)
|
||||
for key, value in puts.items():
|
||||
batch.put(key, value)
|
||||
self.write_state(batch)
|
||||
|
||||
return nremoves
|
||||
|
||||
def get_txnums(self, hashX, limit=1000):
|
||||
'''Generator that returns an unpruned, sorted list of tx_nums in the
|
||||
history of a hashX. Includes both spending and receiving
|
||||
transactions. By default yields at most 1000 entries. Set
|
||||
limit to None to get them all. '''
|
||||
limit = util.resolve_limit(limit)
|
||||
for key, hist in self.db.iterator(prefix=hashX):
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
for tx_num in a:
|
||||
if limit == 0:
|
||||
return
|
||||
yield tx_num
|
||||
limit -= 1
|
||||
|
||||
#
|
||||
# History compaction
|
||||
#
|
||||
|
||||
# comp_cursor is a cursor into compaction progress.
|
||||
# -1: no compaction in progress
|
||||
# 0-65535: Compaction in progress; all prefixes < comp_cursor have
|
||||
# been compacted, and later ones have not.
|
||||
# 65536: compaction complete in-memory but not flushed
|
||||
#
|
||||
# comp_flush_count applies during compaction, and is a flush count
|
||||
# for history with prefix < comp_cursor. flush_count applies
|
||||
# to still uncompacted history. It is -1 when no compaction is
|
||||
# taking place. Key suffixes up to and including comp_flush_count
|
||||
# are used, so a parallel history flush must first increment this
|
||||
#
|
||||
# When compaction is complete and the final flush takes place,
|
||||
# flush_count is reset to comp_flush_count, and comp_flush_count to -1
|
||||
|
||||
def _flush_compaction(self, cursor, write_items, keys_to_delete):
|
||||
'''Flush a single compaction pass as a batch.'''
|
||||
# Update compaction state
|
||||
if cursor == 65536:
|
||||
self.flush_count = self.comp_flush_count
|
||||
self.comp_cursor = -1
|
||||
self.comp_flush_count = -1
|
||||
else:
|
||||
self.comp_cursor = cursor
|
||||
|
||||
# History DB. Flush compacted history and updated state
|
||||
with self.db.write_batch() as batch:
|
||||
# Important: delete first! The keyspace may overlap.
|
||||
for key in keys_to_delete:
|
||||
batch.delete(key)
|
||||
for key, value in write_items:
|
||||
batch.put(key, value)
|
||||
self.write_state(batch)
|
||||
|
||||
def _compact_hashX(self, hashX, hist_map, hist_list,
|
||||
write_items, keys_to_delete):
|
||||
'''Compres history for a hashX. hist_list is an ordered list of
|
||||
the histories to be compressed.'''
|
||||
# History entries (tx numbers) are 4 bytes each. Distribute
|
||||
# over rows of up to 50KB in size. A fixed row size means
|
||||
# future compactions will not need to update the first N - 1
|
||||
# rows.
|
||||
max_row_size = self.max_hist_row_entries * 4
|
||||
full_hist = b''.join(hist_list)
|
||||
nrows = (len(full_hist) + max_row_size - 1) // max_row_size
|
||||
if nrows > 4:
|
||||
self.logger.info('hashX {} is large: {:,d} entries across '
|
||||
'{:,d} rows'
|
||||
.format(hash_to_str(hashX), len(full_hist) // 4,
|
||||
nrows))
|
||||
|
||||
# Find what history needs to be written, and what keys need to
|
||||
# be deleted. Start by assuming all keys are to be deleted,
|
||||
# and then remove those that are the same on-disk as when
|
||||
# compacted.
|
||||
write_size = 0
|
||||
keys_to_delete.update(hist_map)
|
||||
for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
|
||||
key = hashX + pack('>H', n)
|
||||
if hist_map.get(key) == chunk:
|
||||
keys_to_delete.remove(key)
|
||||
else:
|
||||
write_items.append((key, chunk))
|
||||
write_size += len(chunk)
|
||||
|
||||
assert n + 1 == nrows
|
||||
self.comp_flush_count = max(self.comp_flush_count, n)
|
||||
|
||||
return write_size
|
||||
|
||||
def _compact_prefix(self, prefix, write_items, keys_to_delete):
|
||||
'''Compact all history entries for hashXs beginning with the
|
||||
given prefix. Update keys_to_delete and write.'''
|
||||
prior_hashX = None
|
||||
hist_map = {}
|
||||
hist_list = []
|
||||
|
||||
key_len = HASHX_LEN + 2
|
||||
write_size = 0
|
||||
for key, hist in self.db.iterator(prefix=prefix):
|
||||
# Ignore non-history entries
|
||||
if len(key) != key_len:
|
||||
continue
|
||||
hashX = key[:-2]
|
||||
if hashX != prior_hashX and prior_hashX:
|
||||
write_size += self._compact_hashX(prior_hashX, hist_map,
|
||||
hist_list, write_items,
|
||||
keys_to_delete)
|
||||
hist_map.clear()
|
||||
hist_list.clear()
|
||||
prior_hashX = hashX
|
||||
hist_map[key] = hist
|
||||
hist_list.append(hist)
|
||||
|
||||
if prior_hashX:
|
||||
write_size += self._compact_hashX(prior_hashX, hist_map, hist_list,
|
||||
write_items, keys_to_delete)
|
||||
return write_size
|
||||
|
||||
def _compact_history(self, limit):
|
||||
'''Inner loop of history compaction. Loops until limit bytes have
|
||||
been processed.
|
||||
'''
|
||||
keys_to_delete = set()
|
||||
write_items = [] # A list of (key, value) pairs
|
||||
write_size = 0
|
||||
|
||||
# Loop over 2-byte prefixes
|
||||
cursor = self.comp_cursor
|
||||
while write_size < limit and cursor < 65536:
|
||||
prefix = pack('>H', cursor)
|
||||
write_size += self._compact_prefix(prefix, write_items,
|
||||
keys_to_delete)
|
||||
cursor += 1
|
||||
|
||||
max_rows = self.comp_flush_count + 1
|
||||
self._flush_compaction(cursor, write_items, keys_to_delete)
|
||||
|
||||
self.logger.info('history compaction: wrote {:,d} rows ({:.1f} MB), '
|
||||
'removed {:,d} rows, largest: {:,d}, {:.1f}% complete'
|
||||
.format(len(write_items), write_size / 1000000,
|
||||
len(keys_to_delete), max_rows,
|
||||
100 * cursor / 65536))
|
||||
return write_size
|
||||
|
||||
def cancel_compaction(self):
|
||||
if self.comp_cursor != -1:
|
||||
self.logger.warning('cancelling in-progress history compaction')
|
||||
self.comp_flush_count = -1
|
||||
self.comp_cursor = -1
|
||||
@ -95,7 +95,7 @@ def check_hashX_compaction(db):
|
||||
|
||||
def check_written(db, histories):
|
||||
for hashX, hist in histories.items():
|
||||
db_hist = array.array('I', db.get_history_txnums(hashX, limit=None))
|
||||
db_hist = array.array('I', db.history.get_txnums(hashX, limit=None))
|
||||
assert hist == db_hist
|
||||
|
||||
def compact_history(db):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user