Merge branch 'develop'
This commit is contained in:
commit
909edd0728
@ -100,9 +100,8 @@ Database Format
|
||||
===============
|
||||
|
||||
The database and metadata formats of ElectrumX are very likely to
|
||||
change in the future. If so old DBs would not be usable. However it
|
||||
should be easy to write short Python script to do any necessary
|
||||
conversions in-place without having to start afresh.
|
||||
change in the future which will render old DBs unusable. For now I do
|
||||
not intend to provide converters as the rate of flux is high.
|
||||
|
||||
|
||||
Miscellany
|
||||
|
||||
178
server/db.py
178
server/db.py
@ -17,9 +17,11 @@ import plyvel
|
||||
from lib.coins import Bitcoin
|
||||
from lib.script import ScriptPubKey
|
||||
|
||||
ADDR_TX_HASH_LEN=6
|
||||
UTXO_TX_HASH_LEN=4
|
||||
HIST_ENTRY_LEN=256*4 # Admits 65536 * HIST_ENTRY_LEN/4 entries
|
||||
# History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries
|
||||
HIST_ENTRIES_PER_KEY = 1024
|
||||
HIST_VALUE_BYTES = HIST_ENTRIES_PER_KEY * 4
|
||||
ADDR_TX_HASH_LEN = 4
|
||||
UTXO_TX_HASH_LEN = 4
|
||||
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
|
||||
|
||||
|
||||
@ -36,6 +38,7 @@ class DB(object):
|
||||
TIP_KEY = b'tip'
|
||||
GENESIS_KEY = b'genesis'
|
||||
TX_COUNT_KEY = b'tx_count'
|
||||
FLUSH_COUNT_KEY = b'flush_count'
|
||||
WALL_TIME_KEY = b'wall_time'
|
||||
|
||||
class Error(Exception):
|
||||
@ -59,8 +62,7 @@ class DB(object):
|
||||
self.writes_avoided = 0
|
||||
self.read_cache_hits = 0
|
||||
self.write_cache_hits = 0
|
||||
self.last_writes = 0
|
||||
self.last_time = time.time()
|
||||
self.hcolls = 0
|
||||
|
||||
# Things put in a batch are not visible until the batch is written,
|
||||
# so use a cache.
|
||||
@ -95,6 +97,8 @@ class DB(object):
|
||||
self.tx_count = self.db_tx_count
|
||||
self.height = self.db_height - 1
|
||||
self.tx_counts.fromfile(self.txcount_file, self.db_height)
|
||||
self.last_flush = time.time()
|
||||
# FIXME: this sucks and causes issues with exceptions in init_db()
|
||||
if self.tx_count == 0:
|
||||
self.flush()
|
||||
|
||||
@ -107,6 +111,7 @@ class DB(object):
|
||||
def init_db(self):
|
||||
self.db_height = 0
|
||||
self.db_tx_count = 0
|
||||
self.flush_count = 0
|
||||
self.wall_time = 0
|
||||
self.tip = self.coin.GENESIS_HASH
|
||||
self.put(self.GENESIS_KEY, unhexlify(self.tip))
|
||||
@ -118,12 +123,14 @@ class DB(object):
|
||||
.format(genesis_hash, self.coin.GENESIS_HASH))
|
||||
self.db_height = from_4_bytes(self.get(self.HEIGHT_KEY))
|
||||
self.db_tx_count = from_4_bytes(self.get(self.TX_COUNT_KEY))
|
||||
self.flush_count = from_4_bytes(self.get(self.FLUSH_COUNT_KEY))
|
||||
self.wall_time = from_4_bytes(self.get(self.WALL_TIME_KEY))
|
||||
self.tip = hexlify(self.get(self.TIP_KEY))
|
||||
self.logger.info('{}/{} height: {:,d} tx count: {:,d} sync time: {}'
|
||||
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
|
||||
'flush count: {:,d} sync time: {}'
|
||||
.format(self.coin.NAME, self.coin.NET,
|
||||
self.db_height - 1, self.db_tx_count,
|
||||
self.formatted_wall_time()))
|
||||
self.flush_count, self.formatted_wall_time()))
|
||||
|
||||
def formatted_wall_time(self):
|
||||
wall_time = int(self.wall_time)
|
||||
@ -158,54 +165,43 @@ class DB(object):
|
||||
else:
|
||||
self.write_cache[key] = None
|
||||
|
||||
def put_state(self):
|
||||
now = time.time()
|
||||
self.wall_time += now - self.last_time
|
||||
self.last_time = now
|
||||
self.db_tx_count = self.tx_count
|
||||
self.db_height = self.height + 1
|
||||
self.put(self.HEIGHT_KEY, to_4_bytes(self.db_height))
|
||||
self.put(self.TX_COUNT_KEY, to_4_bytes(self.db_tx_count))
|
||||
self.put(self.TIP_KEY, unhexlify(self.tip))
|
||||
self.put(self.WALL_TIME_KEY, to_4_bytes(int(self.wall_time)))
|
||||
|
||||
def flush(self):
|
||||
'''Flush out all cached state.'''
|
||||
flush_start = time.time()
|
||||
last_flush = self.last_flush
|
||||
tx_diff = self.tx_count - self.db_tx_count
|
||||
height_diff = self.height + 1 - self.db_height
|
||||
self.logger.info('starting flush {:,d} txs and {:,d} blocks'
|
||||
.format(tx_diff, height_diff))
|
||||
|
||||
# Write out the files to the FS before flushing to the DB. If
|
||||
# the DB transaction fails, the files being too long doesn't
|
||||
# matter. But if writing the files fails we do not want to
|
||||
# have updated the DB. This disk flush is fast.
|
||||
self.write_headers()
|
||||
self.write_tx_counts()
|
||||
self.write_tx_hashes()
|
||||
|
||||
tx_diff = self.tx_count - self.db_tx_count
|
||||
height_diff = self.height + 1 - self.db_height
|
||||
self.logger.info('flushing to levelDB {:,d} txs and {:,d} blocks '
|
||||
'to height {:,d} tx count: {:,d}'
|
||||
.format(tx_diff, height_diff, self.height,
|
||||
self.tx_count))
|
||||
|
||||
# This LevelDB flush is slow
|
||||
deletes = 0
|
||||
writes = 0
|
||||
# have updated the DB. Flush state last as it reads the wall
|
||||
# time.
|
||||
self.flush_to_fs()
|
||||
with self.db.write_batch(transaction=True) as batch:
|
||||
# Flush the state, then the cache, then the history
|
||||
self.put_state()
|
||||
for key, value in self.write_cache.items():
|
||||
if value is None:
|
||||
batch.delete(key)
|
||||
deletes += 1
|
||||
else:
|
||||
batch.put(key, value)
|
||||
writes += 1
|
||||
self.flush_cache(batch)
|
||||
self.flush_history(batch)
|
||||
self.logger.info('flushed history...')
|
||||
self.flush_state(batch)
|
||||
self.logger.info('committing transaction...')
|
||||
|
||||
self.flush_history()
|
||||
# Update and put the wall time again - otherwise we drop the
|
||||
# time it takes leveldb to commit the batch
|
||||
self.update_wall_time(self.db)
|
||||
|
||||
self.logger.info('flushed. Cache hits: {:,d}/{:,d} writes: {:,d} '
|
||||
'deletes: {:,d} elided: {:,d} sync: {}'
|
||||
.format(self.write_cache_hits,
|
||||
self.read_cache_hits, writes, deletes,
|
||||
self.writes_avoided,
|
||||
flush_time = int(self.last_flush - flush_start)
|
||||
self.logger.info('flushed in {:,d}s to height {:,d} tx count {:,d} '
|
||||
'flush count {:,d}'
|
||||
.format(flush_time, self.height, self.tx_count,
|
||||
self.flush_count))
|
||||
|
||||
txs_per_sec = int(self.tx_count / self.wall_time)
|
||||
this_txs_per_sec = int(tx_diff / (self.last_flush - last_flush))
|
||||
self.logger.info('tx/s since genesis: {:,d} since last flush: {:,d} '
|
||||
'sync time {}'
|
||||
.format(txs_per_sec, this_txs_per_sec,
|
||||
self.formatted_wall_time()))
|
||||
|
||||
# Note this preserves semantics and hopefully saves time
|
||||
@ -214,36 +210,55 @@ class DB(object):
|
||||
self.writes_avoided = 0
|
||||
self.read_cache_hits = 0
|
||||
self.write_cache_hits = 0
|
||||
self.last_writes = writes
|
||||
|
||||
def flush_history(self):
|
||||
def flush_to_fs(self):
|
||||
'''Flush the things stored on the filesystem.'''
|
||||
self.write_headers()
|
||||
self.write_tx_counts()
|
||||
self.write_tx_hashes()
|
||||
os.sync()
|
||||
|
||||
def update_wall_time(self, dest):
|
||||
now = time.time()
|
||||
self.wall_time += now - self.last_flush
|
||||
self.last_flush = now
|
||||
dest.put(self.WALL_TIME_KEY, to_4_bytes(int(self.wall_time)))
|
||||
|
||||
def flush_state(self, batch):
|
||||
self.db_tx_count = self.tx_count
|
||||
self.db_height = self.height + 1
|
||||
batch.put(self.HEIGHT_KEY, to_4_bytes(self.db_height))
|
||||
batch.put(self.TX_COUNT_KEY, to_4_bytes(self.db_tx_count))
|
||||
batch.put(self.FLUSH_COUNT_KEY, to_4_bytes(self.flush_count))
|
||||
batch.put(self.TIP_KEY, unhexlify(self.tip))
|
||||
self.update_wall_time(batch)
|
||||
self.flush_count += 1
|
||||
|
||||
def flush_cache(self, batch):
|
||||
'''Flushes the UTXO write cache.'''
|
||||
deletes = writes = 0
|
||||
for n, (key, value) in enumerate(self.write_cache.items()):
|
||||
if value is None:
|
||||
batch.delete(key)
|
||||
deletes += 1
|
||||
else:
|
||||
batch.put(key, value)
|
||||
writes += 1
|
||||
|
||||
self.logger.info('flushed UTXO cache. Hits: {:,d}/{:,d} '
|
||||
'writes: {:,d} deletes: {:,d} elided: {:,d}'
|
||||
.format(self.write_cache_hits,
|
||||
self.read_cache_hits, writes, deletes,
|
||||
self.writes_avoided))
|
||||
|
||||
def flush_history(self, batch):
|
||||
# Drop any None entry
|
||||
self.history.pop(None, None)
|
||||
|
||||
flush_id = struct.pack('>H', self.flush_count)
|
||||
for hash160, hist in self.history.items():
|
||||
prefix = b'H' + hash160
|
||||
for key, v in self.db.iterator(reverse=True, prefix=prefix,
|
||||
fill_cache=False):
|
||||
assert len(key) == 23
|
||||
v += array.array('I', hist).tobytes()
|
||||
break
|
||||
else:
|
||||
key = prefix + bytes(2)
|
||||
v = array.array('I', hist).tobytes()
|
||||
|
||||
# db.put doesn't accept a memoryview!
|
||||
self.db.put(key, v[:HIST_ENTRY_LEN])
|
||||
if len(v) > HIST_ENTRY_LEN:
|
||||
# must be big-endian
|
||||
(idx, ) = struct.unpack('>H', key[-2:])
|
||||
for n in range(HIST_ENTRY_LEN, len(v), HIST_ENTRY_LEN):
|
||||
idx += 1
|
||||
key = prefix + struct.pack('>H', idx)
|
||||
if idx % 500 == 0:
|
||||
addr = self.coin.P2PKH_address_from_hash160(hash160)
|
||||
self.logger.info('address {} hist moving to idx {:d}'
|
||||
.format(addr, idx))
|
||||
self.db.put(key, v[n:n + HIST_ENTRY_LEN])
|
||||
key = b'H' + hash160 + flush_id
|
||||
batch.put(key, array.array('I', hist).tobytes())
|
||||
|
||||
self.history = defaultdict(list)
|
||||
|
||||
@ -258,10 +273,11 @@ class DB(object):
|
||||
self.delete(key)
|
||||
return data[:20]
|
||||
|
||||
# This should almost never happen
|
||||
assert len(data) % 24 == 0
|
||||
self.logger.info('hash160 compressed key collision {}'
|
||||
.format(key.hex()))
|
||||
self.hcolls += 1
|
||||
if self.hcolls % 1000 == 0:
|
||||
self.logger.info('{} total hash160 compressed key collisions'
|
||||
.format(self.hcolls))
|
||||
for n in range(0, len(data), 24):
|
||||
(tx_num, ) = struct.unpack('<I', data[n+20:n+24])
|
||||
my_hash, height = self.get_tx_hash(tx_num)
|
||||
@ -285,16 +301,18 @@ class DB(object):
|
||||
key = (b'u' + hash160 + prevout.hash[:UTXO_TX_HASH_LEN]
|
||||
+ struct.pack('<H', prevout.n))
|
||||
data = self.get(key)
|
||||
if data is None:
|
||||
self.logger.error('found no UTXO for {} / {:d} key {}'
|
||||
.format(bytes(reversed(prevout.hash)).hex(),
|
||||
prevout.n, bytes(key).hex()))
|
||||
return hash160
|
||||
|
||||
if len(data) == 12:
|
||||
(tx_num, ) = struct.unpack('<I', data[:4])
|
||||
self.delete(key)
|
||||
else:
|
||||
# This should almost never happen
|
||||
assert len(data) % (4 + 8) == 0
|
||||
self.logger.info('UTXO compressed key collision at height {:d}, '
|
||||
'utxo {} / {:d}'
|
||||
.format(self.height, bytes(reversed(prevout.hash))
|
||||
.hex(), prevout.n))
|
||||
for n in range(0, len(data), 12):
|
||||
(tx_num, ) = struct.unpack('<I', data[n:n+4])
|
||||
tx_hash, height = self.get_tx_hash(tx_num)
|
||||
|
||||
@ -6,6 +6,7 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import time
|
||||
from functools import partial
|
||||
|
||||
import aiohttp
|
||||
@ -73,7 +74,15 @@ class BlockCache(object):
|
||||
self.logger.info('catching up, block cache limit {:d}MB...'
|
||||
.format(self.cache_limit))
|
||||
|
||||
last_log = 0
|
||||
while await self.maybe_prefill():
|
||||
now = time.time()
|
||||
if now > last_log + 15:
|
||||
last_log = now
|
||||
self.logger.info('prefilled blocks to height {:,d} '
|
||||
'daemon height: {:,d}'
|
||||
.format(self.fetched_height,
|
||||
self.daemon_height))
|
||||
await asyncio.sleep(1)
|
||||
|
||||
if not self.stop:
|
||||
@ -108,7 +117,6 @@ class BlockCache(object):
|
||||
if not count or self.stop:
|
||||
return False # Done catching up
|
||||
|
||||
# self.logger.info('requesting {:,d} blocks'.format(count))
|
||||
first = self.fetched_height + 1
|
||||
param_lists = [[height] for height in range(first, first + count)]
|
||||
hashes = await self.rpc.rpc_multi('getblockhash', param_lists)
|
||||
@ -129,11 +137,6 @@ class BlockCache(object):
|
||||
# Reverse order and place at front of list
|
||||
self.blocks = list(reversed(blocks)) + self.blocks
|
||||
|
||||
self.logger.info('prefilled {:,d} blocks to height {:,d} '
|
||||
'daemon height: {:,d} block cache size: {:,d}'
|
||||
.format(count, self.fetched_height,
|
||||
self.daemon_height, self.cache_used()))
|
||||
|
||||
# Keep 50 most recent block sizes for fetch count estimation
|
||||
sizes = [len(block) for block in blocks]
|
||||
self.recent_sizes.extend(sizes)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user