diff --git a/server/db.py b/server/db.py index b87249f..17b0f98 100644 --- a/server/db.py +++ b/server/db.py @@ -275,6 +275,157 @@ class UTXOCache(LoggedClass): self.adds = self.cache_hits = self.db_deletes = 0 +class FSCache(LoggedClass): + + def __init__(self, coin, height, tx_count): + super().__init__() + + self.coin = coin + self.tx_hash_file_size = 16 * 1024 * 1024 + assert self.tx_hash_file_size % 32 == 0 + + # On-disk values, updated by a flush + self.height = height + self.tx_count = tx_count + + # Unflushed items + self.headers = [] + self.tx_hashes = [] + + is_new = height == -1 + self.headers_file = self.open_file('headers', is_new) + self.txcount_file = self.open_file('txcount', is_new) + + self.tx_counts = array.array('I') + self.txcount_file.seek(0) + self.tx_counts.fromfile(self.txcount_file, self.height + 1) + + def open_file(self, filename, create=False): + '''Open the file name. Return its handle.''' + try: + return open(filename, 'rb+') + except FileNotFoundError: + if create: + return open(filename, 'wb+') + raise + + def process_block(self, block): + '''Process a new block.''' + assert len(self.tx_counts) == self.height + 1 + len(self.headers) + + tx_hashes, txs = self.coin.read_block(block) + + # Cache the new header, tx hashes and cumulative tx count + self.headers.append(block[:self.coin.HEADER_LEN]) + self.tx_hashes.append(tx_hashes) + self.tx_counts.append(self.tx_count + len(txs)) + + return tx_hashes, txs + + def flush(self, new_height, new_tx_count): + '''Flush the things stored on the filesystem.''' + self.logger.info('flushing to file system') + + block_count = len(self.headers) + assert self.height + block_count == new_height + assert len(self.tx_hashes) == block_count + assert len(self.tx_counts) == self.height + 1 + block_count + + # First the headers + headers = b''.join(self.headers) + header_len = self.coin.HEADER_LEN + self.headers_file.seek((self.height + 1) * header_len) + self.headers_file.write(headers) + self.headers_file.flush() + + # Then the tx counts + self.txcount_file.seek((self.height + 1) * self.tx_counts.itemsize) + self.txcount_file.write(self.tx_counts[self.height + 1:]) + self.txcount_file.flush() + + # Finally the hashes + hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes))) + assert len(hashes) % 32 == 0 + assert self.tx_count + len(hashes) // 32 == new_tx_count + cursor = 0 + file_pos = self.tx_count * 32 + while cursor < len(hashes): + file_num, offset = divmod(file_pos, self.tx_hash_file_size) + size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) + filename = 'hashes{:04d}'.format(file_num) + with self.open_file(filename, create=True) as f: + f.seek(offset) + f.write(hashes[cursor:cursor + size]) + cursor += size + file_pos += size + + os.sync() + + tx_diff = new_tx_count - self.tx_count + self.tx_hashes = [] + self.headers = [] + self.height += block_count + self.tx_count = new_tx_count + + return tx_diff + + def read_headers(self, height, count): + read_count = min(count, self.height + 1 - height) + + assert height >= 0 and read_count >= 0 + assert count <= read_count + len(self.headers) + + result = b'' + if read_count > 0: + header_len = self.coin.HEADER_LEN + self.headers_file.seek(height * header_len) + result = self.headers_file.read(read_count * header_len) + + count -= read_count + if count: + start = (height + read_count) - (self.height + 1) + result += b''.join(self.headers[start: start + count]) + + return result + + def get_tx_hash(self, tx_num): + '''Returns the tx_hash and height of a tx number.''' + height = bisect_right(self.tx_counts, tx_num) + + # Is this on disk or unflushed? + if height > self.height: + tx_hashes = self.tx_hashes[height - (self.height + 1)] + tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]] + else: + file_pos = tx_num * 32 + file_num, offset = divmod(file_pos, self.tx_hash_file_size) + filename = 'hashes{:04d}'.format(file_num) + with self.open_file(filename) as f: + f.seek(offset) + tx_hash = f.read(32) + + return tx_hash, height + + def encode_header(self, height): + if height < 0 or height > self.height + len(self.headers): + raise Exception('no header information for height {:,d}' + .format(height)) + header = self.read_headers(self.height, 1) + unpack = struct.unpack + version, = unpack(' self.fs_height: - tx_hashes = self.tx_hashes[height - (self.fs_height + 1)] - tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]] - else: - file_pos = tx_num * 32 - file_num, offset = divmod(file_pos, self.tx_hash_file_size) - filename = 'hashes{:04d}'.format(file_num) - with self.open_file(filename) as f: - f.seek(offset) - tx_hash = f.read(32) - - return tx_hash, height - @staticmethod def resolve_limit(limit): if limit is None: @@ -674,26 +736,6 @@ class DB(LoggedClass): position in the block.''' return sorted(self.get_utxos(hash168, limit=None)) - def encode_header(self): - if self.height == -1: - return None - header = self.read_headers(self.height, 1) - unpack = struct.unpack - version, = unpack('