diff --git a/docs/ENVIRONMENT.rst b/docs/ENVIRONMENT.rst index 5a46695..6aa695f 100644 --- a/docs/ENVIRONMENT.rst +++ b/docs/ENVIRONMENT.rst @@ -284,20 +284,4 @@ your available physical RAM: versions, a value of 90% of the sum of the old UTXO_MB and HIST_MB variables is roughly equivalent. -Debugging ---------- - -The following are for debugging purposes: - -* **FORCE_REORG** - - If set to a positive integer, it will simulate a reorg of the - blockchain for that number of blocks on startup. You must have - synced before using this, otherwise there will be no undo - information. - - Although it should fail gracefully if set to a value greater than - **REORG_LIMIT**, I do not recommend it as I have not tried it and - there is a chance your DB might corrupt. - .. _lib/coins.py: https://github.com/kyuupichan/electrumx/blob/master/lib/coins.py diff --git a/lib/coins.py b/lib/coins.py index 3ed6ccb..47f1f3f 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -76,7 +76,7 @@ class Coin(object): Return the block less its unspendable coinbase. ''' - header = block[:cls.header_len(0)] + header = cls.block_header(block, 0) header_hex_hash = hash_to_str(cls.header_hash(header)) if header_hex_hash != cls.GENESIS_HASH: raise CoinError('genesis block has hash {} expected {}' @@ -217,15 +217,16 @@ class Coin(object): return cls.header_offset(height + 1) - cls.header_offset(height) @classmethod - def read_block(cls, block, height): - '''Returns a pair (header, tx_list) given a raw block and height. + def block_header(cls, block, height): + '''Returns the block header given a block and its height.''' + return block[:cls.header_len(height)] - tx_list is a list of (deserialized_tx, tx_hash) pairs. - ''' + @classmethod + def block_txs(cls, block, height): + '''Returns a list of (deserialized_tx, tx_hash) pairs given a + block and its height.''' deserializer = cls.deserializer() - hlen = cls.header_len(height) - header, rest = block[:hlen], block[hlen:] - return (header, deserializer(rest).read_block()) + return deserializer(block[cls.header_len(height):]).read_block() @classmethod def decimal_value(cls, value): diff --git a/server/block_processor.py b/server/block_processor.py index 8535b85..a6b3afe 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -197,56 +197,10 @@ class BlockProcessor(server.db.DB): self.logger.info('flushing DB cache at {:,d} MB' .format(self.cache_MB)) - async def main_loop(self, touched): - '''Main loop for block processing.''' - - # Simulate a reorg if requested - if self.env.force_reorg > 0: - self.logger.info('DEBUG: simulating reorg of {:,d} blocks' - .format(self.env.force_reorg)) - await self.handle_chain_reorg(set(), self.env.force_reorg) - - while True: - blocks = await self.prefetcher.get_blocks() - if blocks: - await self.advance_blocks(blocks, touched) - elif blocks is None: - break # Shutdown - else: - self.caught_up() - - self.logger.info('flushing state to DB for a clean shutdown...') - self.flush(True) - self.logger.info('shutdown complete') - - async def advance_blocks(self, blocks, touched): - '''Process the list of blocks passed. Detects and handles reorgs.''' - def job(): - for block in blocks: - self.advance_block(block, touched) - - start = time.time() + async def executor(self, func, *args, **kwargs): + '''Run func taking args in the executor.''' loop = asyncio.get_event_loop() - try: - await loop.run_in_executor(None, job) - except ChainReorg: - await self.handle_chain_reorg(touched) - - if self.caught_up_event.is_set(): - # Flush everything as queries are performed on the DB and - # not in-memory. - self.flush(True) - else: - touched.clear() - if time.time() > self.next_cache_check: - self.check_cache_size() - self.next_cache_check = time.time() + 30 - - if not self.first_sync: - s = '' if len(blocks) == 1 else 's' - self.logger.info('processed {:,d} block{} in {:.1f}s' - .format(len(blocks), s, - time.time() - start)) + await loop.run_in_executor(None, partial(func, *args, **kwargs)) def caught_up(self): '''Called when first caught up after starting.''' @@ -259,21 +213,93 @@ class BlockProcessor(server.db.DB): self.open_dbs() self.caught_up_event.set() + async def main_loop(self, touched): + '''Main loop for block processing.''' + + while True: + blocks = await self.prefetcher.get_blocks() + if blocks: + start = time.time() + await self.check_and_advance_blocks(blocks, touched) + if not self.first_sync: + s = '' if len(blocks) == 1 else 's' + self.logger.info('processed {:,d} block{} in {:.1f}s' + .format(len(blocks), s, + time.time() - start)) + elif blocks is None: + break # Shutdown + else: + self.caught_up() + + self.logger.info('flushing state to DB for a clean shutdown...') + self.flush(True) + self.logger.info('shutdown complete') + + async def check_and_advance_blocks(self, blocks, touched): + '''Process the list of blocks passed. Detects and handles reorgs.''' + first = self.height + 1 + headers = [self.coin.block_header(block, first + n) + for n, block in enumerate(blocks)] + hprevs = [self.coin.header_prevhash(h) for h in headers] + chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]] + + if hprevs == chain: + await self.executor(self.advance_blocks, blocks, headers, touched) + elif hprevs[0] != chain[0]: + await self.handle_chain_reorg(touched) + else: + # It is probably possible but extremely rare that what + # bitcoind returns doesn't form a chain because it + # reorg-ed the chain as it was processing the batched + # block hash requests. Should this happen it's simplest + # just to reset the prefetcher and try again. + self.logger.warning('daemon blocks do not form a chain; ' + 'resetting the prefetcher') + await self.prefetcher.clear(self.height) + + def advance_blocks(self, blocks, headers, touched): + '''Synchronously advance the blocks. + + It is already verified they correctly connect onto our tip. + ''' + block_txs = self.coin.block_txs + daemon_height = self.daemon.cached_height() + + for block in blocks: + self.height += 1 + txs = block_txs(block, self.height) + self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) + undo_info = self.advance_txs(txs, touched) + if daemon_height - self.height <= self.env.reorg_limit: + self.write_undo_info(self.height, b''.join(undo_info)) + + self.headers.extend(headers) + self.tip = self.coin.header_hash(headers[-1]) + + # If caught up, flush everything as client queries are + # performed on the DB. + if self.caught_up_event.is_set(): + self.flush(True) + else: + touched.clear() + if time.time() > self.next_cache_check: + self.check_cache_size() + self.next_cache_check = time.time() + 30 + async def handle_chain_reorg(self, touched, count=None): '''Handle a chain reorganisation. Count is the number of blocks to simulate a reorg, or None for a real reorg.''' self.logger.info('chain reorg detected') - self.flush(True) + await self.executor(self.flush, True) hashes = await self.reorg_hashes(count) # Reverse and convert to hex strings. hashes = [hash_to_str(hash) for hash in reversed(hashes)] for hex_hashes in chunks(hashes, 50): blocks = await self.daemon.raw_blocks(hex_hashes) - self.backup_blocks(blocks, touched) - + await self.executor(self.backup_blocks, blocks, touched) await self.prefetcher.clear(self.height) async def reorg_hashes(self, count): @@ -306,9 +332,10 @@ class BlockProcessor(server.db.DB): else: start = (self.height - count) + 1 - self.logger.info('chain was reorganised: {:,d} blocks at ' - 'heights {:,d}-{:,d} were replaced' - .format(count, start, start + count - 1)) + s = '' if count == 1 else 's' + self.logger.info('chain was reorganised replacing {:,d} block{} at ' + 'heights {:,d}-{:,d}' + .format(count, s, start, start + count - 1)) return self.fs_block_hashes(start, count) @@ -462,27 +489,6 @@ class BlockProcessor(server.db.DB): if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5: self.flush(utxo_MB >= self.cache_MB * 4 // 5) - def fs_advance_block(self, header, txs): - '''Update unflushed FS state for a new block.''' - prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0 - - # Cache the new header, tx hashes and cumulative tx count - self.headers.append(header) - self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) - self.tx_counts.append(prior_tx_count + len(txs)) - - def advance_block(self, block, touched): - header, txs = self.coin.read_block(block, self.height + 1) - if self.tip != self.coin.header_prevhash(header): - raise ChainReorg - - self.fs_advance_block(header, txs) - self.tip = self.coin.header_hash(header) - self.height += 1 - undo_info = self.advance_txs(txs, touched) - if self.daemon.cached_height() - self.height <= self.env.reorg_limit: - self.write_undo_info(self.height, b''.join(undo_info)) - def advance_txs(self, txs, touched): undo_info = [] @@ -524,6 +530,7 @@ class BlockProcessor(server.db.DB): tx_num += 1 self.tx_count = tx_num + self.tx_counts.append(tx_num) self.history_size = history_size return undo_info @@ -537,7 +544,7 @@ class BlockProcessor(server.db.DB): self.assert_flushed() for block in blocks: - header, txs = self.coin.read_block(block, self.height) + txs = self.coin.block_txs(block, self.height) header_hash = self.coin.header_hash(header) if header_hash != self.tip: raise ChainError('backup block {} is not tip {} at height {:,d}' diff --git a/server/controller.py b/server/controller.py index ce8bef4..1afe5eb 100644 --- a/server/controller.py +++ b/server/controller.py @@ -20,7 +20,7 @@ from lib.jsonrpc import JSONRPC, RequestBase import lib.util as util from server.block_processor import BlockProcessor from server.irc import IRC -from server.protocol import LocalRPC, ElectrumX +from server.session import LocalRPC, ElectrumX from server.mempool import MemPool diff --git a/server/env.py b/server/env.py index 647078d..196d24c 100644 --- a/server/env.py +++ b/server/env.py @@ -67,8 +67,6 @@ class Env(LoggedClass): if self.report_ssl_port else self.ssl_port) self.report_host_tor = self.default('REPORT_HOST_TOR', None) - # Debugging - self.force_reorg = self.integer('FORCE_REORG', 0) def default(self, envvar, default): return environ.get(envvar, default) diff --git a/server/protocol.py b/server/session.py similarity index 100% rename from server/protocol.py rename to server/session.py