Split out part of block processor into db.py
The part that doesn't actually do any block processing...
This commit is contained in:
parent
1393f6a030
commit
c0a112f8ea
@ -22,6 +22,7 @@ from server.daemon import DaemonError
|
|||||||
from lib.hash import hash_to_str
|
from lib.hash import hash_to_str
|
||||||
from lib.tx import Deserializer
|
from lib.tx import Deserializer
|
||||||
from lib.util import chunks, LoggedClass
|
from lib.util import chunks, LoggedClass
|
||||||
|
import server.db
|
||||||
from server.storage import open_db
|
from server.storage import open_db
|
||||||
|
|
||||||
|
|
||||||
@ -33,9 +34,6 @@ def formatted_time(t):
|
|||||||
t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60)
|
t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60)
|
||||||
|
|
||||||
|
|
||||||
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
|
|
||||||
|
|
||||||
|
|
||||||
class ChainError(Exception):
|
class ChainError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@ -283,7 +281,7 @@ class MemPool(LoggedClass):
|
|||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
class BlockProcessor(LoggedClass):
|
class BlockProcessor(server.db.DB):
|
||||||
'''Process blocks and update the DB state to match.
|
'''Process blocks and update the DB state to match.
|
||||||
|
|
||||||
Employ a prefetcher to prefetch blocks in batches for processing.
|
Employ a prefetcher to prefetch blocks in batches for processing.
|
||||||
@ -292,9 +290,8 @@ class BlockProcessor(LoggedClass):
|
|||||||
|
|
||||||
def __init__(self, env, daemon, on_update=None):
|
def __init__(self, env, daemon, on_update=None):
|
||||||
'''on_update is awaitable, and called only when caught up with the
|
'''on_update is awaitable, and called only when caught up with the
|
||||||
daemon and a new block arrives or the mempool is updated.
|
daemon and a new block arrives or the mempool is updated.'''
|
||||||
'''
|
super().__init__(env.coin, env.db_engine)
|
||||||
super().__init__()
|
|
||||||
|
|
||||||
self.daemon = daemon
|
self.daemon = daemon
|
||||||
self.on_update = on_update
|
self.on_update = on_update
|
||||||
@ -305,39 +302,16 @@ class BlockProcessor(LoggedClass):
|
|||||||
self.utxo_MB = env.utxo_MB
|
self.utxo_MB = env.utxo_MB
|
||||||
self.hist_MB = env.hist_MB
|
self.hist_MB = env.hist_MB
|
||||||
self.next_cache_check = 0
|
self.next_cache_check = 0
|
||||||
self.coin = env.coin
|
|
||||||
self.reorg_limit = env.reorg_limit
|
self.reorg_limit = env.reorg_limit
|
||||||
|
|
||||||
# Open DB and metadata files. Record some of its state.
|
# Headers and tx_hashes have one entry per block
|
||||||
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
|
|
||||||
self.db = open_db(db_name, env.db_engine)
|
|
||||||
if self.db.is_new:
|
|
||||||
self.logger.info('created new {} database {}'
|
|
||||||
.format(env.db_engine, db_name))
|
|
||||||
else:
|
|
||||||
self.logger.info('successfully opened {} database {}'
|
|
||||||
.format(env.db_engine, db_name))
|
|
||||||
|
|
||||||
self.init_state()
|
|
||||||
self.tx_count = self.db_tx_count
|
|
||||||
self.height = self.db_height
|
|
||||||
self.tip = self.db_tip
|
|
||||||
|
|
||||||
# Caches to be flushed later. Headers and tx_hashes have one
|
|
||||||
# entry per block
|
|
||||||
self.history = defaultdict(partial(array.array, 'I'))
|
self.history = defaultdict(partial(array.array, 'I'))
|
||||||
self.history_size = 0
|
self.history_size = 0
|
||||||
self.utxo_cache = UTXOCache(self, self.db, self.coin)
|
|
||||||
self.fs_cache = FSCache(self.coin, self.height, self.tx_count)
|
|
||||||
self.prefetcher = Prefetcher(daemon, self.height)
|
self.prefetcher = Prefetcher(daemon, self.height)
|
||||||
|
|
||||||
self.last_flush = time.time()
|
self.last_flush = time.time()
|
||||||
self.last_flush_tx_count = self.tx_count
|
self.last_flush_tx_count = self.tx_count
|
||||||
|
|
||||||
# Redirected member funcs
|
|
||||||
self.get_tx_hash = self.fs_cache.get_tx_hash
|
|
||||||
self.read_headers = self.fs_cache.read_headers
|
|
||||||
|
|
||||||
# Log state
|
# Log state
|
||||||
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
|
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
|
||||||
'flush count: {:,d} utxo flush count: {:,d} '
|
'flush count: {:,d} utxo flush count: {:,d} '
|
||||||
@ -451,30 +425,6 @@ class BlockProcessor(LoggedClass):
|
|||||||
|
|
||||||
return self.fs_cache.block_hashes(start, count)
|
return self.fs_cache.block_hashes(start, count)
|
||||||
|
|
||||||
def init_state(self):
|
|
||||||
if self.db.is_new:
|
|
||||||
self.db_height = -1
|
|
||||||
self.db_tx_count = 0
|
|
||||||
self.db_tip = b'\0' * 32
|
|
||||||
self.flush_count = 0
|
|
||||||
self.utxo_flush_count = 0
|
|
||||||
self.wall_time = 0
|
|
||||||
self.first_sync = True
|
|
||||||
else:
|
|
||||||
state = self.db.get(b'state')
|
|
||||||
state = ast.literal_eval(state.decode())
|
|
||||||
if state['genesis'] != self.coin.GENESIS_HASH:
|
|
||||||
raise ChainError('DB genesis hash {} does not match coin {}'
|
|
||||||
.format(state['genesis_hash'],
|
|
||||||
self.coin.GENESIS_HASH))
|
|
||||||
self.db_height = state['height']
|
|
||||||
self.db_tx_count = state['tx_count']
|
|
||||||
self.db_tip = state['tip']
|
|
||||||
self.flush_count = state['flush_count']
|
|
||||||
self.utxo_flush_count = state['utxo_flush_count']
|
|
||||||
self.wall_time = state['wall_time']
|
|
||||||
self.first_sync = state.get('first_sync', True)
|
|
||||||
|
|
||||||
def clean_db(self):
|
def clean_db(self):
|
||||||
'''Clean out stale DB items.
|
'''Clean out stale DB items.
|
||||||
|
|
||||||
@ -839,13 +789,6 @@ class BlockProcessor(LoggedClass):
|
|||||||
assert n == 0
|
assert n == 0
|
||||||
self.tx_count -= len(txs)
|
self.tx_count -= len(txs)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def resolve_limit(limit):
|
|
||||||
if limit is None:
|
|
||||||
return -1
|
|
||||||
assert isinstance(limit, int) and limit >= 0
|
|
||||||
return limit
|
|
||||||
|
|
||||||
def mempool_transactions(self, hash168):
|
def mempool_transactions(self, hash168):
|
||||||
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
|
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
|
||||||
entries for the hash168.
|
entries for the hash168.
|
||||||
@ -860,60 +803,3 @@ class BlockProcessor(LoggedClass):
|
|||||||
Can be positive or negative.
|
Can be positive or negative.
|
||||||
'''
|
'''
|
||||||
return self.mempool.value(hash168)
|
return self.mempool.value(hash168)
|
||||||
|
|
||||||
def get_history(self, hash168, limit=1000):
|
|
||||||
'''Generator that returns an unpruned, sorted list of (tx_hash,
|
|
||||||
height) tuples of confirmed transactions that touched the address,
|
|
||||||
earliest in the blockchain first. Includes both spending and
|
|
||||||
receiving transactions. By default yields at most 1000 entries.
|
|
||||||
Set limit to None to get them all.
|
|
||||||
'''
|
|
||||||
limit = self.resolve_limit(limit)
|
|
||||||
prefix = b'H' + hash168
|
|
||||||
for key, hist in self.db.iterator(prefix=prefix):
|
|
||||||
a = array.array('I')
|
|
||||||
a.frombytes(hist)
|
|
||||||
for tx_num in a:
|
|
||||||
if limit == 0:
|
|
||||||
return
|
|
||||||
yield self.get_tx_hash(tx_num)
|
|
||||||
limit -= 1
|
|
||||||
|
|
||||||
def get_balance(self, hash168):
|
|
||||||
'''Returns the confirmed balance of an address.'''
|
|
||||||
return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None))
|
|
||||||
|
|
||||||
def get_utxos(self, hash168, limit=1000):
|
|
||||||
'''Generator that yields all UTXOs for an address sorted in no
|
|
||||||
particular order. By default yields at most 1000 entries.
|
|
||||||
Set limit to None to get them all.
|
|
||||||
'''
|
|
||||||
limit = self.resolve_limit(limit)
|
|
||||||
unpack = struct.unpack
|
|
||||||
prefix = b'u' + hash168
|
|
||||||
for k, v in self.db.iterator(prefix=prefix):
|
|
||||||
(tx_pos,) = unpack('<H', k[-2:])
|
|
||||||
|
|
||||||
for n in range(0, len(v), 12):
|
|
||||||
if limit == 0:
|
|
||||||
return
|
|
||||||
(tx_num,) = unpack('<I', v[n:n + 4])
|
|
||||||
(value,) = unpack('<Q', v[n + 4:n + 12])
|
|
||||||
tx_hash, height = self.get_tx_hash(tx_num)
|
|
||||||
yield UTXO(tx_num, tx_pos, tx_hash, height, value)
|
|
||||||
limit -= 1
|
|
||||||
|
|
||||||
def get_utxos_sorted(self, hash168):
|
|
||||||
'''Returns all the UTXOs for an address sorted by height and
|
|
||||||
position in the block.'''
|
|
||||||
return sorted(self.get_utxos(hash168, limit=None))
|
|
||||||
|
|
||||||
def get_utxo_hash168(self, tx_hash, index):
|
|
||||||
'''Returns the hash168 for a UTXO.'''
|
|
||||||
hash168 = None
|
|
||||||
if 0 <= index <= 65535:
|
|
||||||
idx_packed = struct.pack('<H', index)
|
|
||||||
hash168 = self.utxo_cache.hash168(tx_hash, idx_packed, False)
|
|
||||||
if hash168 == NO_CACHE_ENTRY:
|
|
||||||
hash168 = None
|
|
||||||
return hash168
|
|
||||||
|
|||||||
@ -83,9 +83,9 @@ class UTXOCache(LoggedClass):
|
|||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
def __init__(self, parent, db, coin):
|
def __init__(self, get_tx_hash, db, coin):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.parent = parent
|
self.get_tx_hash = get_tx_hash
|
||||||
self.coin = coin
|
self.coin = coin
|
||||||
self.cache = {}
|
self.cache = {}
|
||||||
self.put = self.cache.__setitem__
|
self.put = self.cache.__setitem__
|
||||||
@ -137,7 +137,7 @@ class UTXOCache(LoggedClass):
|
|||||||
assert len(data) % 12 == 0
|
assert len(data) % 12 == 0
|
||||||
for n in range(0, len(data), 12):
|
for n in range(0, len(data), 12):
|
||||||
(tx_num, ) = struct.unpack('<I', data[n:n+4])
|
(tx_num, ) = struct.unpack('<I', data[n:n+4])
|
||||||
this_tx_hash, height = self.parent.get_tx_hash(tx_num)
|
this_tx_hash, height = self.get_tx_hash(tx_num)
|
||||||
if tx_hash == this_tx_hash:
|
if tx_hash == this_tx_hash:
|
||||||
result = hash168 + data[n:n+12]
|
result = hash168 + data[n:n+12]
|
||||||
if delete:
|
if delete:
|
||||||
@ -185,7 +185,7 @@ class UTXOCache(LoggedClass):
|
|||||||
# Resolve the compressed key collision using the TX number
|
# Resolve the compressed key collision using the TX number
|
||||||
for n in range(0, len(data), 25):
|
for n in range(0, len(data), 25):
|
||||||
(tx_num, ) = struct.unpack('<I', data[n+21:n+25])
|
(tx_num, ) = struct.unpack('<I', data[n+21:n+25])
|
||||||
my_hash, height = self.parent.get_tx_hash(tx_num)
|
my_hash, height = self.get_tx_hash(tx_num)
|
||||||
if my_hash == tx_hash:
|
if my_hash == tx_hash:
|
||||||
if delete:
|
if delete:
|
||||||
self.cache_write(key, data[:n] + data[n+25:])
|
self.cache_write(key, data[:n] + data[n+25:])
|
||||||
|
|||||||
142
server/db.py
Normal file
142
server/db.py
Normal file
@ -0,0 +1,142 @@
|
|||||||
|
# Copyright (c) 2016, Neil Booth
|
||||||
|
#
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# See the file "LICENCE" for information about the copyright
|
||||||
|
# and warranty status of this software.
|
||||||
|
|
||||||
|
'''Interface to the blockchain database.'''
|
||||||
|
|
||||||
|
import array
|
||||||
|
import ast
|
||||||
|
import struct
|
||||||
|
from collections import namedtuple
|
||||||
|
|
||||||
|
from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY
|
||||||
|
from lib.util import LoggedClass
|
||||||
|
from server.storage import open_db
|
||||||
|
|
||||||
|
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
|
||||||
|
|
||||||
|
|
||||||
|
class DB(LoggedClass):
|
||||||
|
'''Simple wrapper of the backend database for querying.
|
||||||
|
|
||||||
|
Performs no DB update, though the DB will be cleaned on opening if
|
||||||
|
it was shutdown uncleanly.
|
||||||
|
'''
|
||||||
|
|
||||||
|
def __init__(self, coin, db_engine):
|
||||||
|
super().__init__()
|
||||||
|
self.coin = coin
|
||||||
|
|
||||||
|
# Open DB and metadata files. Record some of its state.
|
||||||
|
db_name = '{}-{}'.format(coin.NAME, coin.NET)
|
||||||
|
self.db = open_db(db_name, db_engine)
|
||||||
|
if self.db.is_new:
|
||||||
|
self.logger.info('created new {} database {}'
|
||||||
|
.format(db_engine, db_name))
|
||||||
|
else:
|
||||||
|
self.logger.info('successfully opened {} database {}'
|
||||||
|
.format(db_engine, db_name))
|
||||||
|
|
||||||
|
self.init_state_from_db()
|
||||||
|
self.tx_count = self.db_tx_count
|
||||||
|
self.height = self.db_height
|
||||||
|
self.tip = self.db_tip
|
||||||
|
|
||||||
|
# Cache wrapping the filesystem and redirected functions
|
||||||
|
self.fs_cache = FSCache(self.coin, self.height, self.tx_count)
|
||||||
|
self.get_tx_hash = self.fs_cache.get_tx_hash
|
||||||
|
self.read_headers = self.fs_cache.read_headers
|
||||||
|
|
||||||
|
# UTXO cache
|
||||||
|
self.utxo_cache = UTXOCache(self.get_tx_hash, self.db, self.coin)
|
||||||
|
|
||||||
|
def init_state_from_db(self):
|
||||||
|
if self.db.is_new:
|
||||||
|
self.db_height = -1
|
||||||
|
self.db_tx_count = 0
|
||||||
|
self.db_tip = b'\0' * 32
|
||||||
|
self.flush_count = 0
|
||||||
|
self.utxo_flush_count = 0
|
||||||
|
self.wall_time = 0
|
||||||
|
self.first_sync = True
|
||||||
|
else:
|
||||||
|
state = self.db.get(b'state')
|
||||||
|
state = ast.literal_eval(state.decode())
|
||||||
|
if state['genesis'] != self.coin.GENESIS_HASH:
|
||||||
|
raise ChainError('DB genesis hash {} does not match coin {}'
|
||||||
|
.format(state['genesis_hash'],
|
||||||
|
self.coin.GENESIS_HASH))
|
||||||
|
self.db_height = state['height']
|
||||||
|
self.db_tx_count = state['tx_count']
|
||||||
|
self.db_tip = state['tip']
|
||||||
|
self.flush_count = state['flush_count']
|
||||||
|
self.utxo_flush_count = state['utxo_flush_count']
|
||||||
|
self.wall_time = state['wall_time']
|
||||||
|
self.first_sync = state.get('first_sync', True)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _resolve_limit(limit):
|
||||||
|
if limit is None:
|
||||||
|
return -1
|
||||||
|
assert isinstance(limit, int) and limit >= 0
|
||||||
|
return limit
|
||||||
|
|
||||||
|
def get_history(self, hash168, limit=1000):
|
||||||
|
'''Generator that returns an unpruned, sorted list of (tx_hash,
|
||||||
|
height) tuples of confirmed transactions that touched the address,
|
||||||
|
earliest in the blockchain first. Includes both spending and
|
||||||
|
receiving transactions. By default yields at most 1000 entries.
|
||||||
|
Set limit to None to get them all.
|
||||||
|
'''
|
||||||
|
limit = self._resolve_limit(limit)
|
||||||
|
prefix = b'H' + hash168
|
||||||
|
for key, hist in self.db.iterator(prefix=prefix):
|
||||||
|
a = array.array('I')
|
||||||
|
a.frombytes(hist)
|
||||||
|
for tx_num in a:
|
||||||
|
if limit == 0:
|
||||||
|
return
|
||||||
|
yield self.get_tx_hash(tx_num)
|
||||||
|
limit -= 1
|
||||||
|
|
||||||
|
def get_balance(self, hash168):
|
||||||
|
'''Returns the confirmed balance of an address.'''
|
||||||
|
return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None))
|
||||||
|
|
||||||
|
def get_utxos(self, hash168, limit=1000):
|
||||||
|
'''Generator that yields all UTXOs for an address sorted in no
|
||||||
|
particular order. By default yields at most 1000 entries.
|
||||||
|
Set limit to None to get them all.
|
||||||
|
'''
|
||||||
|
limit = self._resolve_limit(limit)
|
||||||
|
unpack = struct.unpack
|
||||||
|
prefix = b'u' + hash168
|
||||||
|
for k, v in self.db.iterator(prefix=prefix):
|
||||||
|
(tx_pos,) = unpack('<H', k[-2:])
|
||||||
|
|
||||||
|
for n in range(0, len(v), 12):
|
||||||
|
if limit == 0:
|
||||||
|
return
|
||||||
|
(tx_num,) = unpack('<I', v[n:n + 4])
|
||||||
|
(value,) = unpack('<Q', v[n + 4:n + 12])
|
||||||
|
tx_hash, height = self.get_tx_hash(tx_num)
|
||||||
|
yield UTXO(tx_num, tx_pos, tx_hash, height, value)
|
||||||
|
limit -= 1
|
||||||
|
|
||||||
|
def get_utxos_sorted(self, hash168):
|
||||||
|
'''Returns all the UTXOs for an address sorted by height and
|
||||||
|
position in the block.'''
|
||||||
|
return sorted(self.get_utxos(hash168, limit=None))
|
||||||
|
|
||||||
|
def get_utxo_hash168(self, tx_hash, index):
|
||||||
|
'''Returns the hash168 for a UTXO.'''
|
||||||
|
hash168 = None
|
||||||
|
if 0 <= index <= 65535:
|
||||||
|
idx_packed = struct.pack('<H', index)
|
||||||
|
hash168 = self.utxo_cache.hash168(tx_hash, idx_packed, False)
|
||||||
|
if hash168 == NO_CACHE_ENTRY:
|
||||||
|
hash168 = None
|
||||||
|
return hash168
|
||||||
Loading…
Reference in New Issue
Block a user