diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index e0ccf57..a7ecc0a 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -21,7 +21,7 @@ import electrumx from electrumx.server.daemon import DaemonError from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN from electrumx.lib.util import chunks, formatted_time, class_logger -import electrumx.server.db +from electrumx.server.db import DB, FlushData class Prefetcher(object): @@ -142,7 +142,7 @@ class ChainError(Exception): '''Raised on error processing blocks.''' -class BlockProcessor(electrumx.server.db.DB): +class BlockProcessor(DB): '''Process blocks and update the DB state to match. Employ a prefetcher to prefetch blocks in batches for processing. @@ -325,37 +325,23 @@ class BlockProcessor(electrumx.server.db.DB): else: await self.run_in_thread_with_lock(self._flush_body, flush_utxos) - def _flush_body(self, flush_utxos): - '''Flush out cached state. + def flush_data(self): + return FlushData(self.height, self.tx_count, self.headers, + self.tx_hashes, self.undo_infos, self.utxo_cache, + self.db_deletes, self.tip) - History is always flushed. UTXOs are flushed if flush_utxos.''' - flush_start = time.time() + def _flush_body(self, flush_utxos): + '''Flush out cached state. UTXOs are flushed if flush_utxos.''' last_flush = self.last_flush tx_diff = self.tx_count - self.last_flush_tx_count - # Flush to file system - self.flush_fs(self.height, self.tx_count, self.headers, - self.tx_hashes) + self.flush_dbs(self.flush_data(), flush_utxos) self.tx_hashes = [] self.headers = [] - - # Then history - self.flush_history() - - # Flush state last as it reads the wall time. - with self.utxo_db.write_batch() as batch: - if flush_utxos: - self.flush_utxos(batch) - self.flush_state(batch) - - # Update and put the wall time again - otherwise we drop the - # time it took to commit the batch - self.flush_state(self.utxo_db) - - self.logger.info('flush #{:,d} took {:.1f}s. Height {:,d} txs: {:,d}' - .format(self.history.flush_count, - self.last_flush - flush_start, - self.height, self.tx_count)) + if flush_utxos: + self.db_deletes = [] + self.utxo_cache = {} + self.undo_infos = [] # Catch-up stats if self.utxo_db.for_sync: @@ -407,10 +393,14 @@ class BlockProcessor(electrumx.server.db.DB): .format(nremoves)) with self.utxo_db.write_batch() as batch: + self.flush_utxo_db(batch, self.flush_data()) # Flush state last as it reads the wall time. - self.flush_utxos(batch) self.flush_state(batch) + self.db_deletes = [] + self.utxo_cache = {} + self.undo_infos = [] + self.logger.info('backup flush #{:,d} took {:.1f}s. ' 'Height {:,d} txs: {:,d}' .format(self.history.flush_count, @@ -672,15 +662,6 @@ class BlockProcessor(electrumx.server.db.DB): raise ChainError('UTXO {} / {:,d} not found in "h" table' .format(hash_to_hex_str(tx_hash), tx_idx)) - def flush_utxos(self, batch): - '''Flush the cached DB writes and UTXO set to the batch.''' - self.flush_utxo_db(batch, self.db_deletes, self.utxo_cache, - self.undo_infos, self.height, self.tx_count, - self.tip) - self.db_deletes = [] - self.utxo_cache = {} - self.undo_infos = [] - async def _process_prefetched_blocks(self): '''Loop forever processing blocks as they arrive.''' while True: diff --git a/electrumx/server/db.py b/electrumx/server/db.py index f86bcd8..d5d62ca 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -18,6 +18,7 @@ from collections import namedtuple from glob import glob from struct import pack, unpack +import attr from aiorpcx import run_in_thread import electrumx.lib.util as util @@ -29,6 +30,17 @@ from electrumx.server.history import History UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") +@attr.s(slots=True) +class FlushData(object): + height = attr.ib() + tx_count = attr.ib() + headers = attr.ib() + block_tx_hashes = attr.ib() + # The following are flushed to the UTXO DB if undo_infos is not None + undo_infos = attr.ib() + adds = attr.ib() + deletes = attr.ib() + tip = attr.ib() class DB(object): '''Simple wrapper of the backend database for querying. @@ -155,7 +167,34 @@ class DB(object): return await self.header_mc.branch_and_root(length, height) # Flushing - def flush_fs(self, to_height, to_tx_count, headers, block_tx_hashes): + def flush_dbs(self, flush_data, flush_utxos): + '''Flush out cached state. History is always flushed; UTXOs are + flushed if flush_utxos.''' + start_time = time.time() + tx_delta = flush_data.tx_count - self.last_flush_tx_count + + # Flush to file system + self.flush_fs(flush_data) + + # Then history + self.flush_history() + + # Flush state last as it reads the wall time. + with self.utxo_db.write_batch() as batch: + if flush_utxos: + self.flush_utxo_db(batch, flush_data) + self.flush_state(batch) + + # Update and put the wall time again - otherwise we drop the + # time it took to commit the batch + self.flush_state(self.utxo_db) + + elapsed = self.last_flush - start_time + self.logger.info(f'flush #{self.history.flush_count:,d} took ' + f'{elapsed:.1f}s. Height {flush_data.height:,d} ' + f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') + + def flush_fs(self, flush_data): '''Write headers, tx counts and block tx hashes to the filesystem. The first height to write is self.fs_height + 1. The FS @@ -164,38 +203,38 @@ class DB(object): ''' prior_tx_count = (self.tx_counts[self.fs_height] if self.fs_height >= 0 else 0) - assert len(block_tx_hashes) == len(headers) - assert to_height == self.fs_height + len(headers) - assert to_tx_count == self.tx_counts[-1] if self.tx_counts else 0 - assert len(self.tx_counts) == to_height + 1 - hashes = b''.join(block_tx_hashes) + assert len(flush_data.block_tx_hashes) == len(flush_data.headers) + assert flush_data.height == self.fs_height + len(flush_data.headers) + assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts + else 0) + assert len(self.tx_counts) == flush_data.height + 1 + hashes = b''.join(flush_data.block_tx_hashes) assert len(hashes) % 32 == 0 - assert len(hashes) // 32 == to_tx_count - prior_tx_count + assert len(hashes) // 32 == flush_data.tx_count - prior_tx_count # Write the headers, tx counts, and tx hashes start_time = time.time() height_start = self.fs_height + 1 offset = self.header_offset(height_start) - self.headers_file.write(offset, b''.join(headers)) - self.fs_update_header_offsets(offset, height_start, headers) + self.headers_file.write(offset, b''.join(flush_data.headers)) + self.fs_update_header_offsets(offset, height_start, flush_data.headers) offset = height_start * self.tx_counts.itemsize self.tx_counts_file.write(offset, self.tx_counts[height_start:].tobytes()) offset = prior_tx_count * 32 self.hashes_file.write(offset, hashes) - self.fs_height = to_height - self.fs_tx_count = to_tx_count + self.fs_height = flush_data.height + self.fs_tx_count = flush_data.tx_count if self.utxo_db.for_sync: elapsed = time.time() - start_time - self.logger.info(f'flushed to FS in {elapsed:.2f}s') + self.logger.info(f'flushed filesystem data in {elapsed:.2f}s') def flush_history(self): self.history.flush() - def flush_utxo_db(self, batch, deletes, adds, undo_infos, - to_height, to_tx_count, to_tip): + def flush_utxo_db(self, batch, flush_data): '''Flush the cached DB writes and UTXO set to the batch.''' # Care is needed because the writes generated by flushing the # UTXO state may have keys in common with our write cache or @@ -204,12 +243,12 @@ class DB(object): # Spends batch_delete = batch.delete - for key in sorted(deletes): + for key in sorted(flush_data.deletes): batch_delete(key) # New UTXOs batch_put = batch.put - for key, value in adds.items(): + for key, value in flush_data.adds.items(): # suffix = tx_idx + tx_num hashX = value[:-12] suffix = key[-2:] + value[-12:-8] @@ -217,21 +256,23 @@ class DB(object): batch_put(b'u' + hashX + suffix, value[-8:]) # New undo information - self.flush_undo_infos(batch_put, undo_infos) + self.flush_undo_infos(batch_put, flush_data.undo_infos) if self.utxo_db.for_sync: - block_count = to_height - self.db_height - tx_count = to_tx_count - self.db_tx_count + block_count = flush_data.height - self.db_height + tx_count = flush_data.tx_count - self.db_tx_count + add_count = len(flush_data.adds) + spend_count = len(flush_data.deletes) // 2 elapsed = time.time() - start_time self.logger.info(f'flushed {block_count:,d} blocks with ' - f'{tx_count:,d} txs, {len(adds):,d} UTXO adds, ' - f'{len(deletes) // 2:,d} spends in ' + f'{tx_count:,d} txs, {add_count:,d} UTXO adds, ' + f'{spend_count:,d} spends in ' f'{elapsed:.1f}s, committing...') self.utxo_flush_count = self.history.flush_count - self.db_height = to_height - self.db_tx_count = to_tx_count - self.db_tip = to_tip + self.db_height = flush_data.height + self.db_tx_count = flush_data.tx_count + self.db_tip = flush_data.tip def flush_state(self, batch): '''Flush chain state to the batch.'''