From 3972e18e98ded0c20964109f6e7a9140290bee93 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 8 Nov 2016 23:16:12 +0900 Subject: [PATCH] Move the rest of fs_cache into BlockProcessor --- server/block_processor.py | 110 +++++++++++++++++++++++--------------- server/db.py | 84 +++++++++++------------------ 2 files changed, 98 insertions(+), 96 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 679bc05..a34e6cd 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -298,6 +298,11 @@ class BlockProcessor(server.db.DB): daemon and a new block arrives or the mempool is updated.''' super().__init__(env) + # These are our state as we move ahead of DB state + self.height = self.db_height + self.tip = self.db_tip + self.tx_count = self.db_tx_count + self.daemon = Daemon(env.daemon_url, env.debug) self.daemon.debug_set_height(self.height) self.mempool = MemPool(self) @@ -317,8 +322,10 @@ class BlockProcessor(server.db.DB): self.last_flush = time.time() self.last_flush_tx_count = self.tx_count - # UTXO cache + # Caches of unflushed items self.utxo_cache = UTXOCache(self.get_tx_hash, self.db, self.coin) + self.headers = [] + self.tx_hashes = [] # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' @@ -413,7 +420,7 @@ class BlockProcessor(server.db.DB): start = self.height - 1 count = 1 while start > 0: - hashes = self.block_hashes(start, count) + hashes = self.fs_block_hashes(start, count) hex_hashes = [hash_to_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) n = match_pos(hex_hashes, d_hex_hashes) @@ -430,7 +437,7 @@ class BlockProcessor(server.db.DB): 'height {:,d} to height {:,d}' .format(count, start, start + count - 1)) - return self.block_hashes(start, count) + return self.fs_block_hashes(start, count) def clean_db(self): '''Clean out stale DB items. @@ -504,9 +511,6 @@ class BlockProcessor(server.db.DB): self.height - self.db_height)) self.utxo_cache.flush(batch) self.utxo_flush_count = self.flush_count - self.db_tx_count = self.tx_count - self.db_height = self.height - self.db_tip = self.tip def assert_flushed(self): '''Asserts state is fully flushed.''' @@ -524,39 +528,40 @@ class BlockProcessor(server.db.DB): self.assert_flushed() return + self.flush_count += 1 flush_start = time.time() last_flush = self.last_flush tx_diff = self.tx_count - self.last_flush_tx_count show_stats = self.first_sync - # Write out the files to the FS before flushing to the DB. If - # the DB transaction fails, the files being too long doesn't - # matter. But if writing the files fails we do not want to - # have updated the DB. if self.height > self.db_height: assert flush_history is None flush_history = self.flush_history - self.fs_flush() - self.logger.info('FS flush took {:.1f} seconds' - .format(time.time() - flush_start)) with self.db.write_batch() as batch: # History first - fast and frees memory. Flush state last # as it reads the wall time. flush_history(batch) if flush_utxos: + self.fs_flush() self.flush_utxos(batch) self.flush_state(batch) self.logger.info('committing transaction...') + # Update our in-memory state after successful flush + self.db_tx_count = self.tx_count + self.db_height = self.height + self.db_tip = self.tip + self.tx_hashes = [] + self.headers = [] + # Update and put the wall time again - otherwise we drop the # time it took to commit the batch self.flush_state(self.db) - flush_time = int(self.last_flush - flush_start) self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} took {:,d}s' .format(self.flush_count, self.height, self.tx_count, - flush_time)) + int(self.last_flush - flush_start))) # Catch-up stats if show_stats: @@ -582,31 +587,30 @@ class BlockProcessor(server.db.DB): formatted_time(tx_est / this_tx_per_sec))) def flush_history(self, batch): - self.logger.info('flushing history') - - self.flush_count += 1 + flush_start = time.time() flush_id = struct.pack('>H', self.flush_count) for hash168, hist in self.history.items(): key = b'H' + hash168 + flush_id batch.put(key, hist.tobytes()) - self.logger.info('{:,d} history entries in {:,d} addrs' - .format(self.history_size, len(self.history))) - + self.logger.info('flushed {:,d} history entries for {:,d} addrs ' + 'in {:,d}s' + .format(self.history_size, len(self.history), + int(time.time() - flush_start))) self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 def fs_flush(self): - '''Flush the things stored on the filesystem. - The arguments are passed for sanity check assertions only.''' + '''Flush the things stored on the filesystem.''' + flush_start = time.time() blocks_done = len(self.headers) - prior_tx_count = (self.tx_counts[self.fs_height] - if self.fs_height >= 0 else 0) + prior_tx_count = (self.tx_counts[self.db_height] + if self.db_height >= 0 else 0) cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 txs_done = cur_tx_count - prior_tx_count - assert self.fs_height + blocks_done == self.height + assert self.db_height + blocks_done == self.height assert len(self.tx_hashes) == blocks_done assert len(self.tx_counts) == self.height + 1 assert cur_tx_count == self.tx_count, \ @@ -615,13 +619,13 @@ class BlockProcessor(server.db.DB): # First the headers headers = b''.join(self.headers) header_len = self.coin.HEADER_LEN - self.headers_file.seek((self.fs_height + 1) * header_len) + self.headers_file.seek((self.db_height + 1) * header_len) self.headers_file.write(headers) self.headers_file.flush() # Then the tx counts - self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize) - self.txcount_file.write(self.tx_counts[self.fs_height + 1:]) + self.txcount_file.seek((self.db_height + 1) * self.tx_counts.itemsize) + self.txcount_file.write(self.tx_counts[self.db_height + 1:]) self.txcount_file.flush() # Finally the hashes @@ -642,9 +646,8 @@ class BlockProcessor(server.db.DB): os.sync() - self.tx_hashes = [] - self.headers = [] - self.fs_height += blocks_done + self.logger.info('FS flush took {:.1f} seconds' + .format(time.time() - flush_start)) def backup_history(self, batch, hash168s): self.logger.info('backing up history to height {:,d} tx_count {:,d}' @@ -795,16 +798,6 @@ class BlockProcessor(server.db.DB): return undo_info - def fs_backup_block(self): - '''Revert a block.''' - assert not self.headers - assert not self.tx_hashes - assert self.fs_height >= 0 - # Just update in-memory. It doesn't matter if disk files are - # too long, they will be overwritten when advancing. - self.fs_height -= 1 - self.tx_counts.pop() - def backup_blocks(self, blocks): '''Backup the blocks and flush. @@ -824,10 +817,13 @@ class BlockProcessor(server.db.DB): hash_to_str(self.tip), self.height)) self.backup_txs(tx_hashes, txs, touched) - self.fs_backup_block() self.tip = prev_hash + assert self.height >= 0 self.height -= 1 + assert not self.headers + assert not self.tx_hashes + self.logger.info('backed up to height {:,d}'.format(self.height)) self.touched.update(touched) @@ -866,6 +862,34 @@ class BlockProcessor(server.db.DB): assert n == 0 self.tx_count -= len(txs) + def read_headers(self, start, count): + # Read some from disk + disk_count = min(count, self.db_height + 1 - start) + result = self.fs_read_headers(start, disk_count) + count -= disk_count + start += disk_count + + # The rest from memory + if count: + start -= self.db_height + 1 + if not (count >= 0 and start + count <= len(self.headers)): + raise ChainError('{:,d} headers starting at {:,d} not on disk' + .format(count, start)) + result += b''.join(self.headers[start: start + count]) + + return result + + def get_tx_hash(self, tx_num): + '''Returns the tx_hash and height of a tx number.''' + tx_hash, tx_height = self.fs_tx_hash(tx_num) + + # Is this unflushed? + if tx_hash is None: + tx_hashes = self.tx_hashes[tx_height - (self.db_height + 1)] + tx_hash = tx_hashes[tx_num - self.tx_counts[tx_height - 1]] + + return tx_hash, tx_height + def mempool_transactions(self, hash168): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool entries for the hash168. diff --git a/server/db.py b/server/db.py index fd2d65f..414eb75 100644 --- a/server/db.py +++ b/server/db.py @@ -49,35 +49,22 @@ class DB(LoggedClass): else: self.logger.info('successfully opened {} database {}' .format(env.db_engine, db_name)) - self.init_state_from_db() - self.tx_count = self.db_tx_count - self.height = self.db_height - self.tip = self.db_tip - # -- FS related members -- - self.tx_hash_file_size = 16 * 1024 * 1024 - - # On-disk height updated by a flush - self.fs_height = self.height - - # Unflushed items - self.headers = [] - self.tx_hashes = [] - - create = self.height == -1 + create = self.db_height == -1 self.headers_file = self.open_file('headers', create) self.txcount_file = self.open_file('txcount', create) + self.tx_hash_file_size = 16 * 1024 * 1024 # tx_counts[N] has the cumulative number of txs at the end of # height N. So tx_counts[0] is 1 - the genesis coinbase self.tx_counts = array.array('I') self.txcount_file.seek(0) - self.tx_counts.fromfile(self.txcount_file, self.height + 1) + self.tx_counts.fromfile(self.txcount_file, self.db_height + 1) if self.tx_counts: - assert self.tx_count == self.tx_counts[-1] + assert self.db_tx_count == self.tx_counts[-1] else: - assert self.tx_count == 0 + assert self.db_tx_count == 0 def init_state_from_db(self): if self.db.is_new: @@ -112,46 +99,37 @@ class DB(LoggedClass): return open(filename, 'wb+') raise - def read_headers(self, start, count): - result = b'' - + def fs_read_headers(self, start, count): # Read some from disk - disk_count = min(count, self.fs_height + 1 - start) - if disk_count > 0: + disk_count = min(count, self.db_height + 1 - start) + if start < 0 or count < 0 or disk_count != count: + raise self.DBError('{:,d} headers starting at {:,d} not on disk' + .format(count, start)) + if disk_count: header_len = self.coin.HEADER_LEN - assert start >= 0 self.headers_file.seek(start * header_len) - result = self.headers_file.read(disk_count * header_len) - count -= disk_count - start += disk_count + return self.headers_file.read(disk_count * header_len) + return b'' - # The rest from memory - start -= self.fs_height + 1 - assert count >= 0 and start + count <= len(self.headers) - result += b''.join(self.headers[start: start + count]) + def fs_tx_hash(self, tx_num): + '''Return a par (tx_hash, tx_height) for the given tx number. - return result + If the tx_height is not on disk, returns (None, tx_height).''' + tx_height = bisect_right(self.tx_counts, tx_num) - def get_tx_hash(self, tx_num): - '''Returns the tx_hash and height of a tx number.''' - height = bisect_right(self.tx_counts, tx_num) + if tx_height > self.db_height: + return None, tx_height + raise self.DBError('tx_num {:,d} is not on disk') - # Is this on disk or unflushed? - if height > self.fs_height: - tx_hashes = self.tx_hashes[height - (self.fs_height + 1)] - tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]] - else: - file_pos = tx_num * 32 - file_num, offset = divmod(file_pos, self.tx_hash_file_size) - filename = 'hashes{:04d}'.format(file_num) - with self.open_file(filename) as f: - f.seek(offset) - tx_hash = f.read(32) + file_pos = tx_num * 32 + file_num, offset = divmod(file_pos, self.tx_hash_file_size) + filename = 'hashes{:04d}'.format(file_num) + with self.open_file(filename) as f: + f.seek(offset) + return f.read(32), tx_height - return tx_hash, height - - def block_hashes(self, height, count): - headers = self.read_headers(height, count) + def fs_block_hashes(self, height, count): + headers = self.fs_read_headers(height, count) # FIXME: move to coins.py hlen = self.coin.HEADER_LEN return [double_sha256(header) for header in chunks(headers, hlen)] @@ -178,7 +156,7 @@ class DB(LoggedClass): for tx_num in a: if limit == 0: return - yield self.get_tx_hash(tx_num) + yield self.fs_tx_hash(tx_num) limit -= 1 def get_balance(self, hash168): @@ -201,7 +179,7 @@ class DB(LoggedClass): return (tx_num,) = unpack('