Move to flush_dbs in db.py

This commit is contained in:
Neil Booth 2018-08-09 10:04:17 +09:00
parent d3f9ba386c
commit 42c3a308db
2 changed files with 83 additions and 61 deletions

View File

@ -21,7 +21,7 @@ import electrumx
from electrumx.server.daemon import DaemonError from electrumx.server.daemon import DaemonError
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
from electrumx.lib.util import chunks, formatted_time, class_logger from electrumx.lib.util import chunks, formatted_time, class_logger
import electrumx.server.db from electrumx.server.db import DB, FlushData
class Prefetcher(object): class Prefetcher(object):
@ -142,7 +142,7 @@ class ChainError(Exception):
'''Raised on error processing blocks.''' '''Raised on error processing blocks.'''
class BlockProcessor(electrumx.server.db.DB): class BlockProcessor(DB):
'''Process blocks and update the DB state to match. '''Process blocks and update the DB state to match.
Employ a prefetcher to prefetch blocks in batches for processing. Employ a prefetcher to prefetch blocks in batches for processing.
@ -325,37 +325,23 @@ class BlockProcessor(electrumx.server.db.DB):
else: else:
await self.run_in_thread_with_lock(self._flush_body, flush_utxos) await self.run_in_thread_with_lock(self._flush_body, flush_utxos)
def _flush_body(self, flush_utxos): def flush_data(self):
'''Flush out cached state. return FlushData(self.height, self.tx_count, self.headers,
self.tx_hashes, self.undo_infos, self.utxo_cache,
self.db_deletes, self.tip)
History is always flushed. UTXOs are flushed if flush_utxos.''' def _flush_body(self, flush_utxos):
flush_start = time.time() '''Flush out cached state. UTXOs are flushed if flush_utxos.'''
last_flush = self.last_flush last_flush = self.last_flush
tx_diff = self.tx_count - self.last_flush_tx_count tx_diff = self.tx_count - self.last_flush_tx_count
# Flush to file system self.flush_dbs(self.flush_data(), flush_utxos)
self.flush_fs(self.height, self.tx_count, self.headers,
self.tx_hashes)
self.tx_hashes = [] self.tx_hashes = []
self.headers = [] self.headers = []
if flush_utxos:
# Then history self.db_deletes = []
self.flush_history() self.utxo_cache = {}
self.undo_infos = []
# Flush state last as it reads the wall time.
with self.utxo_db.write_batch() as batch:
if flush_utxos:
self.flush_utxos(batch)
self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.utxo_db)
self.logger.info('flush #{:,d} took {:.1f}s. Height {:,d} txs: {:,d}'
.format(self.history.flush_count,
self.last_flush - flush_start,
self.height, self.tx_count))
# Catch-up stats # Catch-up stats
if self.utxo_db.for_sync: if self.utxo_db.for_sync:
@ -407,10 +393,14 @@ class BlockProcessor(electrumx.server.db.DB):
.format(nremoves)) .format(nremoves))
with self.utxo_db.write_batch() as batch: with self.utxo_db.write_batch() as batch:
self.flush_utxo_db(batch, self.flush_data())
# Flush state last as it reads the wall time. # Flush state last as it reads the wall time.
self.flush_utxos(batch)
self.flush_state(batch) self.flush_state(batch)
self.db_deletes = []
self.utxo_cache = {}
self.undo_infos = []
self.logger.info('backup flush #{:,d} took {:.1f}s. ' self.logger.info('backup flush #{:,d} took {:.1f}s. '
'Height {:,d} txs: {:,d}' 'Height {:,d} txs: {:,d}'
.format(self.history.flush_count, .format(self.history.flush_count,
@ -672,15 +662,6 @@ class BlockProcessor(electrumx.server.db.DB):
raise ChainError('UTXO {} / {:,d} not found in "h" table' raise ChainError('UTXO {} / {:,d} not found in "h" table'
.format(hash_to_hex_str(tx_hash), tx_idx)) .format(hash_to_hex_str(tx_hash), tx_idx))
def flush_utxos(self, batch):
'''Flush the cached DB writes and UTXO set to the batch.'''
self.flush_utxo_db(batch, self.db_deletes, self.utxo_cache,
self.undo_infos, self.height, self.tx_count,
self.tip)
self.db_deletes = []
self.utxo_cache = {}
self.undo_infos = []
async def _process_prefetched_blocks(self): async def _process_prefetched_blocks(self):
'''Loop forever processing blocks as they arrive.''' '''Loop forever processing blocks as they arrive.'''
while True: while True:

View File

@ -18,6 +18,7 @@ from collections import namedtuple
from glob import glob from glob import glob
from struct import pack, unpack from struct import pack, unpack
import attr
from aiorpcx import run_in_thread from aiorpcx import run_in_thread
import electrumx.lib.util as util import electrumx.lib.util as util
@ -29,6 +30,17 @@ from electrumx.server.history import History
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
@attr.s(slots=True)
class FlushData(object):
height = attr.ib()
tx_count = attr.ib()
headers = attr.ib()
block_tx_hashes = attr.ib()
# The following are flushed to the UTXO DB if undo_infos is not None
undo_infos = attr.ib()
adds = attr.ib()
deletes = attr.ib()
tip = attr.ib()
class DB(object): class DB(object):
'''Simple wrapper of the backend database for querying. '''Simple wrapper of the backend database for querying.
@ -155,7 +167,34 @@ class DB(object):
return await self.header_mc.branch_and_root(length, height) return await self.header_mc.branch_and_root(length, height)
# Flushing # Flushing
def flush_fs(self, to_height, to_tx_count, headers, block_tx_hashes): def flush_dbs(self, flush_data, flush_utxos):
'''Flush out cached state. History is always flushed; UTXOs are
flushed if flush_utxos.'''
start_time = time.time()
tx_delta = flush_data.tx_count - self.last_flush_tx_count
# Flush to file system
self.flush_fs(flush_data)
# Then history
self.flush_history()
# Flush state last as it reads the wall time.
with self.utxo_db.write_batch() as batch:
if flush_utxos:
self.flush_utxo_db(batch, flush_data)
self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.utxo_db)
elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.history.flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
def flush_fs(self, flush_data):
'''Write headers, tx counts and block tx hashes to the filesystem. '''Write headers, tx counts and block tx hashes to the filesystem.
The first height to write is self.fs_height + 1. The FS The first height to write is self.fs_height + 1. The FS
@ -164,38 +203,38 @@ class DB(object):
''' '''
prior_tx_count = (self.tx_counts[self.fs_height] prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0) if self.fs_height >= 0 else 0)
assert len(block_tx_hashes) == len(headers) assert len(flush_data.block_tx_hashes) == len(flush_data.headers)
assert to_height == self.fs_height + len(headers) assert flush_data.height == self.fs_height + len(flush_data.headers)
assert to_tx_count == self.tx_counts[-1] if self.tx_counts else 0 assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
assert len(self.tx_counts) == to_height + 1 else 0)
hashes = b''.join(block_tx_hashes) assert len(self.tx_counts) == flush_data.height + 1
hashes = b''.join(flush_data.block_tx_hashes)
assert len(hashes) % 32 == 0 assert len(hashes) % 32 == 0
assert len(hashes) // 32 == to_tx_count - prior_tx_count assert len(hashes) // 32 == flush_data.tx_count - prior_tx_count
# Write the headers, tx counts, and tx hashes # Write the headers, tx counts, and tx hashes
start_time = time.time() start_time = time.time()
height_start = self.fs_height + 1 height_start = self.fs_height + 1
offset = self.header_offset(height_start) offset = self.header_offset(height_start)
self.headers_file.write(offset, b''.join(headers)) self.headers_file.write(offset, b''.join(flush_data.headers))
self.fs_update_header_offsets(offset, height_start, headers) self.fs_update_header_offsets(offset, height_start, flush_data.headers)
offset = height_start * self.tx_counts.itemsize offset = height_start * self.tx_counts.itemsize
self.tx_counts_file.write(offset, self.tx_counts_file.write(offset,
self.tx_counts[height_start:].tobytes()) self.tx_counts[height_start:].tobytes())
offset = prior_tx_count * 32 offset = prior_tx_count * 32
self.hashes_file.write(offset, hashes) self.hashes_file.write(offset, hashes)
self.fs_height = to_height self.fs_height = flush_data.height
self.fs_tx_count = to_tx_count self.fs_tx_count = flush_data.tx_count
if self.utxo_db.for_sync: if self.utxo_db.for_sync:
elapsed = time.time() - start_time elapsed = time.time() - start_time
self.logger.info(f'flushed to FS in {elapsed:.2f}s') self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
def flush_history(self): def flush_history(self):
self.history.flush() self.history.flush()
def flush_utxo_db(self, batch, deletes, adds, undo_infos, def flush_utxo_db(self, batch, flush_data):
to_height, to_tx_count, to_tip):
'''Flush the cached DB writes and UTXO set to the batch.''' '''Flush the cached DB writes and UTXO set to the batch.'''
# Care is needed because the writes generated by flushing the # Care is needed because the writes generated by flushing the
# UTXO state may have keys in common with our write cache or # UTXO state may have keys in common with our write cache or
@ -204,12 +243,12 @@ class DB(object):
# Spends # Spends
batch_delete = batch.delete batch_delete = batch.delete
for key in sorted(deletes): for key in sorted(flush_data.deletes):
batch_delete(key) batch_delete(key)
# New UTXOs # New UTXOs
batch_put = batch.put batch_put = batch.put
for key, value in adds.items(): for key, value in flush_data.adds.items():
# suffix = tx_idx + tx_num # suffix = tx_idx + tx_num
hashX = value[:-12] hashX = value[:-12]
suffix = key[-2:] + value[-12:-8] suffix = key[-2:] + value[-12:-8]
@ -217,21 +256,23 @@ class DB(object):
batch_put(b'u' + hashX + suffix, value[-8:]) batch_put(b'u' + hashX + suffix, value[-8:])
# New undo information # New undo information
self.flush_undo_infos(batch_put, undo_infos) self.flush_undo_infos(batch_put, flush_data.undo_infos)
if self.utxo_db.for_sync: if self.utxo_db.for_sync:
block_count = to_height - self.db_height block_count = flush_data.height - self.db_height
tx_count = to_tx_count - self.db_tx_count tx_count = flush_data.tx_count - self.db_tx_count
add_count = len(flush_data.adds)
spend_count = len(flush_data.deletes) // 2
elapsed = time.time() - start_time elapsed = time.time() - start_time
self.logger.info(f'flushed {block_count:,d} blocks with ' self.logger.info(f'flushed {block_count:,d} blocks with '
f'{tx_count:,d} txs, {len(adds):,d} UTXO adds, ' f'{tx_count:,d} txs, {add_count:,d} UTXO adds, '
f'{len(deletes) // 2:,d} spends in ' f'{spend_count:,d} spends in '
f'{elapsed:.1f}s, committing...') f'{elapsed:.1f}s, committing...')
self.utxo_flush_count = self.history.flush_count self.utxo_flush_count = self.history.flush_count
self.db_height = to_height self.db_height = flush_data.height
self.db_tx_count = to_tx_count self.db_tx_count = flush_data.tx_count
self.db_tip = to_tip self.db_tip = flush_data.tip
def flush_state(self, batch): def flush_state(self, batch):
'''Flush chain state to the batch.''' '''Flush chain state to the batch.'''