diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index a7ecc0a..457d30b 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -261,7 +261,7 @@ class BlockProcessor(DB): for hex_hashes in chunks(hashes, 50): raw_blocks = await get_raw_blocks(last, hex_hashes) await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) - await self.backup_flush() + await self.flush_for_backup() last -= len(raw_blocks) await self.prefetcher.reset_height(self.height) @@ -319,6 +319,16 @@ class BlockProcessor(DB): assert not self.db_deletes self.db_assert_flushed(self.tx_count, self.height) + async def flush_for_backup(self): + # self.touched can include other addresses which is + # harmless, but remove None. + self.touched.discard(None) + await self.run_in_thread_with_lock( + self.flush_backup, self.flush_data(), self.touched) + self.db_deletes = [] + self.utxo_cache = {} + self.undo_infos = [] + async def flush(self, flush_utxos): if self.height == self.db_height: self.assert_flushed() @@ -367,46 +377,6 @@ class BlockProcessor(DB): .format(formatted_time(self.wall_time), formatted_time(tx_est / this_tx_per_sec))) - async def backup_flush(self): - assert self.height < self.db_height - assert not self.headers - assert not self.tx_hashes - self.history.assert_flushed() - await self.run_in_thread_with_lock(self._backup_flush_body) - - def _backup_flush_body(self): - '''Like flush() but when backing up. All UTXOs are flushed. - - hashXs - sequence of hashXs which were touched by backing - up. Searched for history entries to remove after the backup - height. - ''' - flush_start = time.time() - - self.backup_fs(self.height, self.tx_count) - - # Backup history. self.touched can include other addresses - # which is harmless, but remove None. - self.touched.discard(None) - nremoves = self.history.backup(self.touched, self.tx_count) - self.logger.info('backing up removed {:,d} history entries' - .format(nremoves)) - - with self.utxo_db.write_batch() as batch: - self.flush_utxo_db(batch, self.flush_data()) - # Flush state last as it reads the wall time. - self.flush_state(batch) - - self.db_deletes = [] - self.utxo_cache = {} - self.undo_infos = [] - - self.logger.info('backup flush #{:,d} took {:.1f}s. ' - 'Height {:,d} txs: {:,d}' - .format(self.history.flush_count, - self.last_flush - flush_start, - self.height, self.tx_count)) - def check_cache_size(self): '''Flush a cache if it gets too big.''' # Good average estimates based on traversal of subobjects and diff --git a/electrumx/server/db.py b/electrumx/server/db.py index d5d62ca..861845c 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -282,6 +282,28 @@ class DB(object): self.last_flush_tx_count = self.fs_tx_count self.write_utxo_state(batch) + def flush_backup(self, flush_data, touched): + '''Like flush_dbs() but when backing up. All UTXOs are flushed.''' + assert not flush_data.headers + assert not flush_data.block_tx_hashes + assert flush_data.height < self.db_height + self.history.assert_flushed() + + start_time = time.time() + tx_delta = flush_data.tx_count - self.last_flush_tx_count + + self.backup_fs(flush_data.height, flush_data.tx_count) + self.history.backup(touched, flush_data.tx_count) + with self.utxo_db.write_batch() as batch: + self.flush_utxo_db(batch, flush_data) + # Flush state last as it reads the wall time. + self.flush_state(batch) + + elapsed = self.last_flush - start_time + self.logger.info(f'backup flush #{self.history.flush_count:,d} took ' + f'{elapsed:.1f}s. Height {flush_data.height:,d} ' + f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') + def db_assert_flushed(self, to_tx_count, to_height): '''Asserts state is fully flushed.''' assert to_tx_count == self.fs_tx_count == self.db_tx_count diff --git a/electrumx/server/history.py b/electrumx/server/history.py index eaab4af..b42ca6c 100644 --- a/electrumx/server/history.py +++ b/electrumx/server/history.py @@ -85,7 +85,7 @@ class History(object): if flush_id > utxo_flush_count: keys.append(key) - self.logger.info('deleting {:,d} history entries'.format(len(keys))) + self.logger.info(f'deleting {len(keys):,d} history entries') self.flush_count = utxo_flush_count with self.db.write_batch() as batch: @@ -144,7 +144,6 @@ class History(object): self.logger.info(f'flushed history in {elapsed:.1f}s ' f'for {count:,d} addrs') - def backup(self, hashXs, tx_count): # Not certain this is needed, but it doesn't hurt self.flush_count += 1 @@ -172,7 +171,7 @@ class History(object): batch.put(key, value) self.write_state(batch) - return nremoves + self.logger.info(f'backing up removed {nremoves:,d} history entries') def get_txnums(self, hashX, limit=1000): '''Generator that returns an unpruned, sorted list of tx_nums in the