diff --git a/RELEASE-NOTES b/RELEASE-NOTES index d283fc3..0c74d53 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,14 @@ +version 0.7.12 +-------------- + +- minor bug fixes: 2 in JSON RPC, 1 in get_utxos (affected addresslistunspent) +- leveldb / rocksdb are opened with a different maximum open files limit, + depending on whether the chain has been fully synced or not. If synced + you want the files for network sockets, if not synced for the DB engines. + Once synced the DB will be reopened with the lower limit to free up the + files for serving network connections +- various refactoring preparing for possible asynchronous block processing + version 0.7.11 -------------- diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index ee416e0..23bcd93 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -277,7 +277,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): except RCPError: pass else: - self.handle_notification(method, params) + await self.handle_notification(method, params) return None async def json_request(self, message): @@ -297,7 +297,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): await self.handle_response(message['result'], None, message['id']) return None - def raise_unknown_method(method): + def raise_unknown_method(self, method): '''Respond to a request with an unknown method.''' raise self.RPCError('unknown method: "{}"'.format(method), self.METHOD_NOT_FOUND) diff --git a/server/block_processor.py b/server/block_processor.py index a1619cb..1e661ac 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -23,24 +23,22 @@ from lib.util import chunks, formatted_time, LoggedClass import server.db -class ChainError(Exception): - pass - - class Prefetcher(LoggedClass): '''Prefetches blocks (in the forward direction only).''' - def __init__(self, daemon, height): + def __init__(self, tasks, daemon, height): super().__init__() + self.tasks = tasks self.daemon = daemon self.semaphore = asyncio.Semaphore() - self.queue = asyncio.Queue() - self.queue_size = 0 self.caught_up = False self.fetched_height = height + # A list of (blocks, size) pairs. Earliest last. + self.cache = [] + self.cache_size = 0 # Target cache size. Has little effect on sync time. self.target_cache_size = 10 * 1024 * 1024 - # First fetch to be 10 blocks + # This makes the first fetch be 10 blocks self.ave_size = self.target_cache_size // 10 async def clear(self, height): @@ -53,19 +51,19 @@ class Prefetcher(LoggedClass): with await self.semaphore: while not self.queue.empty(): self.queue.get_nowait() - self.queue_size = 0 + self.cache = [] + self.cache_size = 0 self.fetched_height = height - self.caught_up = False + self.logger.info('reset to height'.format(height)) - async def get_blocks(self): - '''Blocking function that returns prefetched blocks. - - The returned result empty just once - when the prefetcher - has caught up with the daemon. - ''' - blocks, size = await self.queue.get() - self.queue_size -= size - return blocks + def get_blocks(self): + '''Return the next list of blocks from our prefetch cache.''' + # Cache might be empty after a clear() + if self.cache: + blocks, size = self.cache.pop() + self.cache_size -= size + return blocks + return [] async def main_loop(self): '''Loop forever polling for more blocks.''' @@ -73,9 +71,13 @@ class Prefetcher(LoggedClass): .format(await self.daemon.height())) while True: try: - with await self.semaphore: - await self._prefetch() - await asyncio.sleep(5 if self.caught_up else 0) + secs = 0 + if self.cache_size < self.target_cache_size: + if not await self._prefetch(): + self.caught_up = True + secs = 5 + self.tasks.put_nowait(None) + await asyncio.sleep(secs) except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) except asyncio.CancelledError: @@ -83,41 +85,41 @@ class Prefetcher(LoggedClass): async def _prefetch(self): '''Prefetch blocks unless the prefetch queue is full.''' - if self.queue_size >= self.target_cache_size: - return - daemon_height = await self.daemon.height() cache_room = self.target_cache_size // self.ave_size - # Try and catch up all blocks but limit to room in cache. - # Constrain count to between 0 and 4000 regardless - count = min(daemon_height - self.fetched_height, cache_room) - count = min(4000, max(count, 0)) - if not count: - # Indicate when we have caught up for the first time only - if not self.caught_up: - self.caught_up = True - self.queue.put_nowait(([], 0)) - return + with await self.semaphore: + # Try and catch up all blocks but limit to room in cache. + # Constrain count to between 0 and 4000 regardless + count = min(daemon_height - self.fetched_height, cache_room) + count = min(4000, max(count, 0)) + if not count: + return 0 - first = self.fetched_height + 1 - hex_hashes = await self.daemon.block_hex_hashes(first, count) - if self.caught_up: - self.logger.info('new block height {:,d} hash {}' - .format(first + count - 1, hex_hashes[-1])) - blocks = await self.daemon.raw_blocks(hex_hashes) - size = sum(len(block) for block in blocks) + first = self.fetched_height + 1 + hex_hashes = await self.daemon.block_hex_hashes(first, count) + if self.caught_up: + self.logger.info('new block height {:,d} hash {}' + .format(first + count - 1, hex_hashes[-1])) + blocks = await self.daemon.raw_blocks(hex_hashes) - # Update our recent average block size estimate - if count >= 10: - self.ave_size = size // count - else: - self.ave_size = (size + (10 - count) * self.ave_size) // 10 + size = sum(len(block) for block in blocks) - self.fetched_height += len(blocks) - self.queue.put_nowait((blocks, size)) - self.queue_size += size + # Update our recent average block size estimate + if count >= 10: + self.ave_size = size // count + else: + self.ave_size = (size + (10 - count) * self.ave_size) // 10 + self.cache.insert(0, (blocks, size)) + self.cache_size += size + self.fetched_height += len(blocks) + + return count + + +class ChainError(Exception): + '''Raised on error processing blocks.''' class ChainReorg(Exception): '''Raised on a blockchain reorganisation.''' @@ -135,6 +137,9 @@ class BlockProcessor(server.db.DB): self.client = client + # The block processor reads its tasks from this queue + self.tasks = asyncio.Queue() + # These are our state as we move ahead of DB state self.fs_height = self.db_height self.fs_tx_count = self.db_tx_count @@ -144,6 +149,7 @@ class BlockProcessor(server.db.DB): self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url)) self.caught_up = False + self._shutdown = False self.event = asyncio.Event() # Meta @@ -154,7 +160,7 @@ class BlockProcessor(server.db.DB): # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - self.prefetcher = Prefetcher(self.daemon, self.height) + self.prefetcher = Prefetcher(self.tasks, self.daemon, self.height) self.last_flush = time.time() self.last_flush_tx_count = self.tx_count @@ -176,33 +182,33 @@ class BlockProcessor(server.db.DB): async def main_loop(self): '''Main loop for block processing.''' - try: - # Simulate a reorg if requested - if self.env.force_reorg > 0: - self.logger.info('DEBUG: simulating reorg of {:,d} blocks' - .format(self.env.force_reorg)) - await self.handle_chain_reorg(self.env.force_reorg, set()) - while True: - await self._wait_for_update() - except asyncio.CancelledError: - pass + # Simulate a reorg if requested + if self.env.force_reorg > 0: + self.logger.info('DEBUG: simulating reorg of {:,d} blocks' + .format(self.env.force_reorg)) + await self.handle_chain_reorg(set(), self.env.force_reorg) + + while True: + task = await self.tasks.get() + if self._shutdown: + break + blocks = self.prefetcher.get_blocks() + if blocks: + await self.advance_blocks(blocks) + elif not self.caught_up: + self.caught_up = True + self.first_caught_up() - async def shutdown(self): - '''Shut down the DB cleanly.''' - self.logger.info('flushing state to DB for clean shutdown...') self.flush(True) - async def _wait_for_update(self): - '''Wait for the prefetcher to deliver blocks. - - Blocks are only processed in the forward direction. - ''' - blocks = await self.prefetcher.get_blocks() - if not blocks: - self.first_caught_up() - return + def shutdown(self): + '''Call to shut down the block processor.''' + self.logger.info('flushing state to DB for clean shutdown...') + self._shutdown = True + self.tasks.put_nowait(None) + async def advance_blocks(self, blocks): '''Strip the unspendable genesis coinbase.''' if self.height == -1: blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) @@ -213,11 +219,12 @@ class BlockProcessor(server.db.DB): self.advance_block(block, touched) await asyncio.sleep(0) # Yield except ChainReorg: - await self.handle_chain_reorg(None, touched) + await self.handle_chain_reorg(touched) if self.caught_up: # Flush everything as queries are performed on the DB and # not in-memory. + await asyncio.sleep(0) self.flush(True) self.client.notify(touched) elif time.time() > self.next_cache_check: @@ -225,23 +232,23 @@ class BlockProcessor(server.db.DB): self.next_cache_check = time.time() + 60 def first_caught_up(self): - '''Called when first caught up after start, or after a reorg.''' - self.caught_up = True - if self.first_sync: - self.first_sync = False - self.logger.info('{} synced to height {:,d}. DB version:' - .format(VERSION, self.height, self.db_version)) + '''Called when first caught up after starting.''' self.flush(True) + if self.first_sync: + self.logger.info('{} synced to height {:,d}' + .format(VERSION, self.height)) + self.first_sync = False + self.flush_state(self.db) + self.reopen_db(False) self.event.set() - async def handle_chain_reorg(self, count, touched): + async def handle_chain_reorg(self, touched, count=None): '''Handle a chain reorganisation. Count is the number of blocks to simulate a reorg, or None for a real reorg.''' self.logger.info('chain reorg detected') self.flush(True) - self.logger.info('finding common height...') hashes = await self.reorg_hashes(count) # Reverse and convert to hex strings. @@ -251,7 +258,6 @@ class BlockProcessor(server.db.DB): self.backup_blocks(blocks, touched) await self.prefetcher.clear(self.height) - self.logger.info('prefetcher reset') async def reorg_hashes(self, count): '''Return the list of hashes to back up beacuse of a reorg. @@ -305,12 +311,11 @@ class BlockProcessor(server.db.DB): assert not self.utxo_cache assert not self.db_deletes - def flush(self, flush_utxos=False, flush_history=None): + def flush(self, flush_utxos=False): '''Flush out cached state. History is always flushed. UTXOs are flushed if flush_utxos.''' if self.height == self.db_height: - assert flush_history is None self.assert_flushed() return @@ -319,14 +324,10 @@ class BlockProcessor(server.db.DB): last_flush = self.last_flush tx_diff = self.tx_count - self.last_flush_tx_count - if self.height > self.db_height: - assert flush_history is None - flush_history = self.flush_history - with self.db.write_batch() as batch: # History first - fast and frees memory. Flush state last # as it reads the wall time. - flush_history(batch) + self.flush_history(batch) if flush_utxos: self.flush_utxos(batch) self.flush_state(batch) @@ -394,12 +395,36 @@ class BlockProcessor(server.db.DB): self.tx_hashes = [] self.headers = [] - def backup_history(self, batch, hash168s): - self.logger.info('backing up history to height {:,d} tx_count {:,d}' - .format(self.height, self.tx_count)) + def backup_flush(self, hash168s): + '''Like flush() but when backing up. All UTXOs are flushed. + hash168s - sequence of hash168s which were touched by backing + up. Searched for history entries to remove after the backup + height. + ''' + assert self.height < self.db_height assert not self.history + self.flush_count += 1 + flush_start = time.time() + + with self.db.write_batch() as batch: + # Flush state last as it reads the wall time. + self.backup_history(batch, hash168s) + self.flush_utxos(batch) + self.flush_state(batch) + + # Update and put the wall time again - otherwise we drop the + # time it took to commit the batch + self.flush_state(self.db) + + self.logger.info('backup flush #{:,d} took {:.1f}s. ' + 'Height {:,d} txs: {:,d}' + .format(self.flush_count, + self.last_flush - flush_start, + self.height, self.tx_count)) + + def backup_history(self, batch, hash168s): nremoves = 0 for hash168 in sorted(hash168s): prefix = b'H' + hash168 @@ -426,8 +451,8 @@ class BlockProcessor(server.db.DB): assert not self.headers assert not self.tx_hashes - self.logger.info('removed {:,d} history entries from {:,d} addresses' - .format(nremoves, len(hash168s))) + self.logger.info('backing up removed {:,d} history entries from ' + '{:,d} addresses'.format(nremoves, len(hash168s))) def check_cache_size(self): '''Flush a cache if it gets too big.''' @@ -462,9 +487,6 @@ class BlockProcessor(server.db.DB): self.tx_counts.append(prior_tx_count + len(txs)) def advance_block(self, block, touched): - # We must update the FS cache before calling advance_txs() as - # the UTXO cache uses the FS cache via get_tx_hash() to - # resolve compressed key collisions header, tx_hashes, txs = self.coin.read_block(block) if self.tip != self.coin.header_prevhash(header): raise ChainReorg @@ -477,43 +499,47 @@ class BlockProcessor(server.db.DB): self.write_undo_info(self.height, b''.join(undo_info)) def advance_txs(self, tx_hashes, txs, touched): - put_utxo = self.utxo_cache.__setitem__ - spend_utxo = self.spend_utxo undo_info = [] # Use local vars for speed in the loops history = self.history + history_size = self.history_size tx_num = self.tx_count script_hash168 = self.coin.hash168_from_script() s_pack = pack + put_utxo = self.utxo_cache.__setitem__ + spend_utxo = self.spend_utxo + undo_info_append = undo_info.append for tx, tx_hash in zip(txs, tx_hashes): hash168s = set() + add_hash168 = hash168s.add tx_numb = s_pack(' 1: tx_num, = unpack('= 0.''' # Read some from disk @@ -332,7 +349,7 @@ class DB(LoggedClass): if limit == 0: return limit -= 1 - tx_num, tx_pos = s_unpack('