diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index b30a0a4..8b2015c 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -683,46 +683,13 @@ class BlockProcessor(electrumx.server.db.DB): def flush_utxos(self, batch): '''Flush the cached DB writes and UTXO set to the batch.''' - # Care is needed because the writes generated by flushing the - # UTXO state may have keys in common with our write cache or - # may be in the DB already. - flush_start = time.time() - delete_count = len(self.db_deletes) // 2 - utxo_cache_len = len(self.utxo_cache) - - # Spends - batch_delete = batch.delete - for key in sorted(self.db_deletes): - batch_delete(key) + self.flush_utxo_db(batch, self.db_deletes, self.utxo_cache, + self.undo_infos, self.height, self.tx_count, + self.tip) self.db_deletes = [] - - # New UTXOs - batch_put = batch.put - for cache_key, cache_value in self.utxo_cache.items(): - # suffix = tx_idx + tx_num - hashX = cache_value[:-12] - suffix = cache_key[-2:] + cache_value[-12:-8] - batch_put(b'h' + cache_key[:4] + suffix, hashX) - batch_put(b'u' + hashX + suffix, cache_value[-8:]) self.utxo_cache = {} - - # New undo information - self.flush_undo_infos(batch_put, self.undo_infos) self.undo_infos = [] - if self.utxo_db.for_sync: - self.logger.info('flushed {:,d} blocks with {:,d} txs, {:,d} UTXO ' - 'adds, {:,d} spends in {:.1f}s, committing...' - .format(self.height - self.db_height, - self.tx_count - self.db_tx_count, - utxo_cache_len, delete_count, - time.time() - flush_start)) - - self.utxo_flush_count = self.history.flush_count - self.db_tx_count = self.tx_count - self.db_height = self.height - self.db_tip = self.tip - async def _process_prefetched_blocks(self): '''Loop forever processing blocks as they arrive.''' while True: diff --git a/electrumx/server/db.py b/electrumx/server/db.py index b705258..7beba1d 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -193,6 +193,45 @@ class DB(object): def flush_history(self): self.history.flush() + def flush_utxo_db(self, batch, deletes, adds, undo_infos, + to_height, to_tx_count, to_tip): + '''Flush the cached DB writes and UTXO set to the batch.''' + # Care is needed because the writes generated by flushing the + # UTXO state may have keys in common with our write cache or + # may be in the DB already. + start_time = time.time() + + # Spends + batch_delete = batch.delete + for key in sorted(deletes): + batch_delete(key) + + # New UTXOs + batch_put = batch.put + for key, value in adds.items(): + # suffix = tx_idx + tx_num + hashX = value[:-12] + suffix = key[-2:] + value[-12:-8] + batch_put(b'h' + key[:4] + suffix, hashX) + batch_put(b'u' + hashX + suffix, value[-8:]) + + # New undo information + self.flush_undo_infos(batch_put, undo_infos) + + if self.utxo_db.for_sync: + block_count = to_height - self.db_height + tx_count = to_tx_count - self.db_tx_count + elapsed = time.time() - start_time + self.logger.info(f'flushed {block_count:,d} blocks with ' + f'{tx_count:,d} txs, {len(adds):,d} UTXO adds, ' + f'{len(deletes) // 2:,d} spends in ' + f'{elapsed:.1f}s, committing...') + + self.utxo_flush_count = self.history.flush_count + self.db_height = to_height + self.db_tx_count = to_tx_count + self.db_tip = to_tip + def db_assert_flushed(self, to_tx_count, to_height): '''Asserts state is fully flushed.''' assert to_tx_count == self.fs_tx_count == self.db_tx_count