diff --git a/server/block_processor.py b/server/block_processor.py index 23dffcd..679bc05 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -10,13 +10,15 @@ import array import asyncio +import itertools +import os import struct import time from bisect import bisect_left from collections import defaultdict from functools import partial -from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY +from server.cache import UTXOCache, NO_CACHE_ENTRY from server.daemon import Daemon, DaemonError from lib.hash import hash_to_str from lib.tx import Deserializer @@ -315,6 +317,9 @@ class BlockProcessor(server.db.DB): self.last_flush = time.time() self.last_flush_tx_count = self.tx_count + # UTXO cache + self.utxo_cache = UTXOCache(self.get_tx_hash, self.db, self.coin) + # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' 'flush count: {:,d} utxo flush count: {:,d} ' @@ -408,7 +413,7 @@ class BlockProcessor(server.db.DB): start = self.height - 1 count = 1 while start > 0: - hashes = self.fs_cache.block_hashes(start, count) + hashes = self.block_hashes(start, count) hex_hashes = [hash_to_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) n = match_pos(hex_hashes, d_hex_hashes) @@ -425,7 +430,7 @@ class BlockProcessor(server.db.DB): 'height {:,d} to height {:,d}' .format(count, start, start + count - 1)) - return self.fs_cache.block_hashes(start, count) + return self.block_hashes(start, count) def clean_db(self): '''Clean out stale DB items. @@ -531,7 +536,9 @@ class BlockProcessor(server.db.DB): if self.height > self.db_height: assert flush_history is None flush_history = self.flush_history - self.fs_cache.flush(self.height, self.tx_count) + self.fs_flush() + self.logger.info('FS flush took {:.1f} seconds' + .format(time.time() - flush_start)) with self.db.write_batch() as batch: # History first - fast and frees memory. Flush state last @@ -590,6 +597,55 @@ class BlockProcessor(server.db.DB): self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 + def fs_flush(self): + '''Flush the things stored on the filesystem. + The arguments are passed for sanity check assertions only.''' + blocks_done = len(self.headers) + prior_tx_count = (self.tx_counts[self.fs_height] + if self.fs_height >= 0 else 0) + cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 + txs_done = cur_tx_count - prior_tx_count + + assert self.fs_height + blocks_done == self.height + assert len(self.tx_hashes) == blocks_done + assert len(self.tx_counts) == self.height + 1 + assert cur_tx_count == self.tx_count, \ + 'cur: {:,d} new: {:,d}'.format(cur_tx_count, self.tx_count) + + # First the headers + headers = b''.join(self.headers) + header_len = self.coin.HEADER_LEN + self.headers_file.seek((self.fs_height + 1) * header_len) + self.headers_file.write(headers) + self.headers_file.flush() + + # Then the tx counts + self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize) + self.txcount_file.write(self.tx_counts[self.fs_height + 1:]) + self.txcount_file.flush() + + # Finally the hashes + hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes))) + assert len(hashes) % 32 == 0 + assert len(hashes) // 32 == txs_done + cursor = 0 + file_pos = prior_tx_count * 32 + while cursor < len(hashes): + file_num, offset = divmod(file_pos, self.tx_hash_file_size) + size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) + filename = 'hashes{:04d}'.format(file_num) + with self.open_file(filename, create=True) as f: + f.seek(offset) + f.write(hashes[cursor:cursor + size]) + cursor += size + file_pos += size + + os.sync() + + self.tx_hashes = [] + self.headers = [] + self.fs_height += blocks_done + def backup_history(self, batch, hash168s): self.logger.info('backing up history to height {:,d} tx_count {:,d}' .format(self.height, self.tx_count)) @@ -659,9 +715,18 @@ class BlockProcessor(server.db.DB): '''Read undo information from a file for the current height.''' return self.db.get(self.undo_key(height)) + def fs_advance_block(self, header, tx_hashes, txs): + '''Update unflushed FS state for a new block.''' + prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0 + + # Cache the new header, tx hashes and cumulative tx count + self.headers.append(header) + self.tx_hashes.append(tx_hashes) + self.tx_counts.append(prior_tx_count + len(txs)) + def advance_block(self, block, update_touched): - # We must update the fs_cache before calling advance_txs() as - # the UTXO cache uses the fs_cache via get_tx_hash() to + # We must update the FS cache before calling advance_txs() as + # the UTXO cache uses the FS cache via get_tx_hash() to # resolve compressed key collisions header, tx_hashes, txs = self.coin.read_block(block) prev_hash, header_hash = self.coin.header_hashes(header) @@ -669,7 +734,7 @@ class BlockProcessor(server.db.DB): raise ChainReorg touched = set() - self.fs_cache.advance_block(header, tx_hashes, txs) + self.fs_advance_block(header, tx_hashes, txs) self.tip = header_hash self.height += 1 undo_info = self.advance_txs(tx_hashes, txs, touched) @@ -730,6 +795,16 @@ class BlockProcessor(server.db.DB): return undo_info + def fs_backup_block(self): + '''Revert a block.''' + assert not self.headers + assert not self.tx_hashes + assert self.fs_height >= 0 + # Just update in-memory. It doesn't matter if disk files are + # too long, they will be overwritten when advancing. + self.fs_height -= 1 + self.tx_counts.pop() + def backup_blocks(self, blocks): '''Backup the blocks and flush. @@ -749,7 +824,7 @@ class BlockProcessor(server.db.DB): hash_to_str(self.tip), self.height)) self.backup_txs(tx_hashes, txs, touched) - self.fs_cache.backup_block() + self.fs_backup_block() self.tip = prev_hash self.height -= 1 diff --git a/server/cache.py b/server/cache.py index f06eb49..834f5b2 100644 --- a/server/cache.py +++ b/server/cache.py @@ -12,14 +12,10 @@ Once synced flushes are performed after processing each block. ''' -import array -import itertools -import os import struct -from bisect import bisect_right -from lib.util import chunks, LoggedClass -from lib.hash import double_sha256, hash_to_str +from lib.util import LoggedClass +from lib.hash import hash_to_str # History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries @@ -259,155 +255,3 @@ class UTXOCache(LoggedClass): .format(new_utxos, self.db_deletes, hcolls, ucolls)) self.cache_spends = self.db_deletes = 0 - - -class FSCache(LoggedClass): - - def __init__(self, coin, height, tx_count): - super().__init__() - - self.coin = coin - self.tx_hash_file_size = 16 * 1024 * 1024 - assert self.tx_hash_file_size % 32 == 0 - - # On-disk values, updated by a flush - self.height = height - - # Unflushed items - self.headers = [] - self.tx_hashes = [] - - is_new = height == -1 - self.headers_file = self.open_file('headers', is_new) - self.txcount_file = self.open_file('txcount', is_new) - - # tx_counts[N] has the cumulative number of txs at the end of - # height N. So tx_counts[0] is 1 - the genesis coinbase - self.tx_counts = array.array('I') - self.txcount_file.seek(0) - self.tx_counts.fromfile(self.txcount_file, self.height + 1) - if self.tx_counts: - assert tx_count == self.tx_counts[-1] - else: - assert tx_count == 0 - - def open_file(self, filename, create=False): - '''Open the file name. Return its handle.''' - try: - return open(filename, 'rb+') - except FileNotFoundError: - if create: - return open(filename, 'wb+') - raise - - def advance_block(self, header, tx_hashes, txs): - '''Update the FS cache for a new block.''' - prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0 - - # Cache the new header, tx hashes and cumulative tx count - self.headers.append(header) - self.tx_hashes.append(tx_hashes) - self.tx_counts.append(prior_tx_count + len(txs)) - - def backup_block(self): - '''Revert a block.''' - assert not self.headers - assert not self.tx_hashes - assert self.height >= 0 - # Just update in-memory. It doesn't matter if disk files are - # too long, they will be overwritten when advancing. - self.height -= 1 - self.tx_counts.pop() - - def flush(self, new_height, new_tx_count): - '''Flush the things stored on the filesystem. - The arguments are passed for sanity check assertions only.''' - self.logger.info('flushing to file system') - - blocks_done = len(self.headers) - prior_tx_count = self.tx_counts[self.height] if self.height >= 0 else 0 - cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 - txs_done = cur_tx_count - prior_tx_count - - assert self.height + blocks_done == new_height - assert len(self.tx_hashes) == blocks_done - assert len(self.tx_counts) == new_height + 1 - assert cur_tx_count == new_tx_count, \ - 'cur: {:,d} new: {:,d}'.format(cur_tx_count, new_tx_count) - - # First the headers - headers = b''.join(self.headers) - header_len = self.coin.HEADER_LEN - self.headers_file.seek((self.height + 1) * header_len) - self.headers_file.write(headers) - self.headers_file.flush() - - # Then the tx counts - self.txcount_file.seek((self.height + 1) * self.tx_counts.itemsize) - self.txcount_file.write(self.tx_counts[self.height + 1:]) - self.txcount_file.flush() - - # Finally the hashes - hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes))) - assert len(hashes) % 32 == 0 - assert len(hashes) // 32 == txs_done - cursor = 0 - file_pos = prior_tx_count * 32 - while cursor < len(hashes): - file_num, offset = divmod(file_pos, self.tx_hash_file_size) - size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) - filename = 'hashes{:04d}'.format(file_num) - with self.open_file(filename, create=True) as f: - f.seek(offset) - f.write(hashes[cursor:cursor + size]) - cursor += size - file_pos += size - - os.sync() - - self.tx_hashes = [] - self.headers = [] - self.height += blocks_done - - def read_headers(self, start, count): - result = b'' - - # Read some from disk - disk_count = min(count, self.height + 1 - start) - if disk_count > 0: - header_len = self.coin.HEADER_LEN - assert start >= 0 - self.headers_file.seek(start * header_len) - result = self.headers_file.read(disk_count * header_len) - count -= disk_count - start += disk_count - - # The rest from memory - start -= self.height + 1 - assert count >= 0 and start + count <= len(self.headers) - result += b''.join(self.headers[start: start + count]) - - return result - - def get_tx_hash(self, tx_num): - '''Returns the tx_hash and height of a tx number.''' - height = bisect_right(self.tx_counts, tx_num) - - # Is this on disk or unflushed? - if height > self.height: - tx_hashes = self.tx_hashes[height - (self.height + 1)] - tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]] - else: - file_pos = tx_num * 32 - file_num, offset = divmod(file_pos, self.tx_hash_file_size) - filename = 'hashes{:04d}'.format(file_num) - with self.open_file(filename) as f: - f.seek(offset) - tx_hash = f.read(32) - - return tx_hash, height - - def block_hashes(self, height, count): - headers = self.read_headers(height, count) - hlen = self.coin.HEADER_LEN - return [double_sha256(header) for header in chunks(headers, hlen)] diff --git a/server/db.py b/server/db.py index 9215dbd..fd2d65f 100644 --- a/server/db.py +++ b/server/db.py @@ -12,15 +12,15 @@ import array import ast import os import struct +from bisect import bisect_right from collections import namedtuple -from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY -from lib.util import LoggedClass +from lib.util import chunks, LoggedClass +from lib.hash import double_sha256 from server.storage import open_db UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") - class DB(LoggedClass): '''Simple wrapper of the backend database for querying. @@ -28,6 +28,9 @@ class DB(LoggedClass): it was shutdown uncleanly. ''' + class DBError(Exception): + pass + def __init__(self, env): super().__init__() self.env = env @@ -52,13 +55,29 @@ class DB(LoggedClass): self.height = self.db_height self.tip = self.db_tip - # Cache wrapping the filesystem and redirected functions - self.fs_cache = FSCache(self.coin, self.height, self.tx_count) - self.get_tx_hash = self.fs_cache.get_tx_hash - self.read_headers = self.fs_cache.read_headers + # -- FS related members -- + self.tx_hash_file_size = 16 * 1024 * 1024 - # UTXO cache - self.utxo_cache = UTXOCache(self.get_tx_hash, self.db, self.coin) + # On-disk height updated by a flush + self.fs_height = self.height + + # Unflushed items + self.headers = [] + self.tx_hashes = [] + + create = self.height == -1 + self.headers_file = self.open_file('headers', create) + self.txcount_file = self.open_file('txcount', create) + + # tx_counts[N] has the cumulative number of txs at the end of + # height N. So tx_counts[0] is 1 - the genesis coinbase + self.tx_counts = array.array('I') + self.txcount_file.seek(0) + self.tx_counts.fromfile(self.txcount_file, self.height + 1) + if self.tx_counts: + assert self.tx_count == self.tx_counts[-1] + else: + assert self.tx_count == 0 def init_state_from_db(self): if self.db.is_new: @@ -73,9 +92,9 @@ class DB(LoggedClass): state = self.db.get(b'state') state = ast.literal_eval(state.decode()) if state['genesis'] != self.coin.GENESIS_HASH: - raise ChainError('DB genesis hash {} does not match coin {}' - .format(state['genesis_hash'], - self.coin.GENESIS_HASH)) + raise self.DBError('DB genesis hash {} does not match coin {}' + .format(state['genesis_hash'], + self.coin.GENESIS_HASH)) self.db_height = state['height'] self.db_tx_count = state['tx_count'] self.db_tip = state['tip'] @@ -84,6 +103,59 @@ class DB(LoggedClass): self.wall_time = state['wall_time'] self.first_sync = state.get('first_sync', True) + def open_file(self, filename, create=False): + '''Open the file name. Return its handle.''' + try: + return open(filename, 'rb+') + except FileNotFoundError: + if create: + return open(filename, 'wb+') + raise + + def read_headers(self, start, count): + result = b'' + + # Read some from disk + disk_count = min(count, self.fs_height + 1 - start) + if disk_count > 0: + header_len = self.coin.HEADER_LEN + assert start >= 0 + self.headers_file.seek(start * header_len) + result = self.headers_file.read(disk_count * header_len) + count -= disk_count + start += disk_count + + # The rest from memory + start -= self.fs_height + 1 + assert count >= 0 and start + count <= len(self.headers) + result += b''.join(self.headers[start: start + count]) + + return result + + def get_tx_hash(self, tx_num): + '''Returns the tx_hash and height of a tx number.''' + height = bisect_right(self.tx_counts, tx_num) + + # Is this on disk or unflushed? + if height > self.fs_height: + tx_hashes = self.tx_hashes[height - (self.fs_height + 1)] + tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]] + else: + file_pos = tx_num * 32 + file_num, offset = divmod(file_pos, self.tx_hash_file_size) + filename = 'hashes{:04d}'.format(file_num) + with self.open_file(filename) as f: + f.seek(offset) + tx_hash = f.read(32) + + return tx_hash, height + + def block_hashes(self, height, count): + headers = self.read_headers(height, count) + # FIXME: move to coins.py + hlen = self.coin.HEADER_LEN + return [double_sha256(header) for header in chunks(headers, hlen)] + @staticmethod def _resolve_limit(limit): if limit is None: @@ -143,7 +215,28 @@ class DB(LoggedClass): hash168 = None if 0 <= index <= 65535: idx_packed = struct.pack('