diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 55c6ef2..8d7e33f 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -310,23 +310,37 @@ class BlockProcessor(electrumx.server.db.DB): if count is None: self.logger.info('chain reorg detected') else: - self.logger.info('faking a reorg of {:,d} blocks'.format(count)) + self.logger.info(f'faking a reorg of {count:,d} blocks') await self.tasks.run_in_thread(self.flush, True) - hashes = await self.reorg_hashes(count) + async def get_raw_blocks(last_height, hex_hashes): + heights = range(last_height, last_height - len(hex_hashes), -1) + try: + blocks = [self.read_raw_block(height) for height in heights] + self.logger.info(f'read {len(blocks)} blocks from disk') + return blocks + except Exception: + return await self.daemon.raw_blocks(hex_hashes) + + start, hashes = await self.reorg_hashes(count) # Reverse and convert to hex strings. hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] + last = start + count - 1 for hex_hashes in chunks(hashes, 50): - blocks = await self.daemon.raw_blocks(hex_hashes) - await self.tasks.run_in_thread(self.backup_blocks, blocks) + raw_blocks = await get_raw_blocks(last, hex_hashes) + await self.tasks.run_in_thread(self.backup_blocks, raw_blocks) + last -= len(raw_blocks) # Truncate header_mc: header count is 1 more than the height self.header_mc.truncate(self.height + 1) await self.prefetcher.reset_height() async def reorg_hashes(self, count): - '''Return the list of hashes to back up beacuse of a reorg. + '''Return a pair (start, hashes) of blocks to back up during a + reorg. - The hashes are returned in order of increasing height.''' + The hashes are returned in order of increasing height. Start + is the height of the first hash. + ''' def diff_pos(hashes1, hashes2): '''Returns the index of the first difference in the hash lists. @@ -360,7 +374,7 @@ class BlockProcessor(electrumx.server.db.DB): 'heights {:,d}-{:,d}' .format(count, s, start, start + count - 1)) - return self.fs_block_hashes(start, count) + return start, self.fs_block_hashes(start, count) def flush_state(self, batch): '''Flush chain state to the batch.''' @@ -527,6 +541,7 @@ class BlockProcessor(electrumx.server.db.DB): undo_info = self.advance_txs(block.transactions) if height >= min_height: self.undo_infos.append((undo_info, height)) + self.write_raw_block(block.raw, height) headers = [block.header for block in blocks] self.height = height diff --git a/electrumx/server/db.py b/electrumx/server/db.py index 2dd234a..25be167 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -12,9 +12,10 @@ import array import ast import os -from struct import pack, unpack from bisect import bisect_right from collections import namedtuple +from glob import glob +from struct import pack, unpack import electrumx.lib.util as util from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN @@ -265,6 +266,29 @@ class DB(object): for undo_info, height in undo_infos: batch_put(self.undo_key(height), b''.join(undo_info)) + def raw_block_prefix(self): + return 'meta/block' + + def raw_block_path(self, height): + return f'{self.raw_block_prefix()}{height:d}' + + def read_raw_block(self, height): + '''Returns a raw block read from disk. Raises FileNotFoundError + if the block isn't on-disk.''' + with util.open_file(self.raw_block_path(height)) as f: + return f.read(-1) + + def write_raw_block(self, block, height): + '''Write a raw block to disk.''' + with util.open_truncate(self.raw_block_path(height)) as f: + f.write(block) + # Delete old blocks to prevent them accumulating + try: + del_height = self.min_undo_height(height) - 1 + os.remove(self.raw_block_path(del_height)) + except FileNotFoundError: + pass + def clear_excess_undo_info(self): '''Clear excess undo info. Only most recent N are kept.''' prefix = b'U' @@ -280,8 +304,20 @@ class DB(object): with self.utxo_db.write_batch() as batch: for key in keys: batch.delete(key) - self.logger.info('deleted {:,d} stale undo entries' - .format(len(keys))) + self.logger.info(f'deleted {len(keys):,d} stale undo entries') + + # delete old block files + prefix = self.raw_block_prefix() + paths = [path for path in glob(f'{prefix}[0-9]*') + if len(path) > len(prefix) + and int(path[len(prefix):]) < min_height] + if paths: + for path in paths: + try: + os.remove(path) + except FileNotFoundError: + pass + self.logger.info(f'deleted {len(paths):,d} stale block files') # -- UTXO database