Have the header merkle cache handle reorgs
Cleanest to move the mc_cache to the block processor
This commit is contained in:
parent
e5ea1c10e2
commit
8630c9136c
@ -203,7 +203,8 @@ class MerkleCache(object):
|
|||||||
return level
|
return level
|
||||||
|
|
||||||
def truncate(self, length):
|
def truncate(self, length):
|
||||||
'''Truncate the cache so it is no longer than length.'''
|
'''Truncate the cache so it covers no more than length underlying
|
||||||
|
hashes.'''
|
||||||
if not isinstance(length, int):
|
if not isinstance(length, int):
|
||||||
raise TypeError('length must be an integer')
|
raise TypeError('length must be an integer')
|
||||||
if length <= 0:
|
if length <= 0:
|
||||||
|
|||||||
@ -18,6 +18,7 @@ from functools import partial
|
|||||||
import electrumx
|
import electrumx
|
||||||
from electrumx.server.daemon import DaemonError
|
from electrumx.server.daemon import DaemonError
|
||||||
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
|
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
|
||||||
|
from electrumx.lib.merkle import Merkle, MerkleCache
|
||||||
from electrumx.lib.util import chunks, formatted_time, class_logger
|
from electrumx.lib.util import chunks, formatted_time, class_logger
|
||||||
import electrumx.server.db
|
import electrumx.server.db
|
||||||
|
|
||||||
@ -128,6 +129,12 @@ class Prefetcher(object):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class HeaderSource(object):
|
||||||
|
|
||||||
|
def __init__(self, db):
|
||||||
|
self.hashes = db.fs_block_hashes
|
||||||
|
|
||||||
|
|
||||||
class ChainError(Exception):
|
class ChainError(Exception):
|
||||||
'''Raised on error processing blocks.'''
|
'''Raised on error processing blocks.'''
|
||||||
|
|
||||||
@ -166,6 +173,10 @@ class BlockProcessor(electrumx.server.db.DB):
|
|||||||
self.last_flush_tx_count = self.tx_count
|
self.last_flush_tx_count = self.tx_count
|
||||||
self.touched = set()
|
self.touched = set()
|
||||||
|
|
||||||
|
# Header merkle cache
|
||||||
|
self.merkle = Merkle()
|
||||||
|
self.header_mc = None
|
||||||
|
|
||||||
# Caches of unflushed items.
|
# Caches of unflushed items.
|
||||||
self.headers = []
|
self.headers = []
|
||||||
self.tx_hashes = []
|
self.tx_hashes = []
|
||||||
@ -220,6 +231,12 @@ class BlockProcessor(electrumx.server.db.DB):
|
|||||||
self.logger.info(f'{electrumx.version} synced to '
|
self.logger.info(f'{electrumx.version} synced to '
|
||||||
f'height {self.height:,d}')
|
f'height {self.height:,d}')
|
||||||
self.open_dbs()
|
self.open_dbs()
|
||||||
|
self.logger.info(f'caught up to height {self.height:,d}')
|
||||||
|
length = max(1, self.height - self.env.reorg_limit)
|
||||||
|
self.header_mc = MerkleCache(self.merkle, HeaderSource(self), length)
|
||||||
|
self.logger.info('populated header merkle cache')
|
||||||
|
|
||||||
|
# Reorgs use header_mc so safest to set this after initializing it
|
||||||
self.caught_up_event.set()
|
self.caught_up_event.set()
|
||||||
|
|
||||||
async def check_and_advance_blocks(self, raw_blocks, first):
|
async def check_and_advance_blocks(self, raw_blocks, first):
|
||||||
@ -291,6 +308,8 @@ class BlockProcessor(electrumx.server.db.DB):
|
|||||||
for hex_hashes in chunks(hashes, 50):
|
for hex_hashes in chunks(hashes, 50):
|
||||||
blocks = await self.daemon.raw_blocks(hex_hashes)
|
blocks = await self.daemon.raw_blocks(hex_hashes)
|
||||||
await self.controller.run_in_executor(self.backup_blocks, blocks)
|
await self.controller.run_in_executor(self.backup_blocks, blocks)
|
||||||
|
# Truncate header_mc: header count is 1 more than the height
|
||||||
|
self.header_mc.truncate(self.height + 1)
|
||||||
await self.prefetcher.reset_height()
|
await self.prefetcher.reset_height()
|
||||||
|
|
||||||
async def reorg_hashes(self, count):
|
async def reorg_hashes(self, count):
|
||||||
|
|||||||
@ -23,7 +23,6 @@ from aiorpcx import RPCError, TaskSet, _version as aiorpcx_version
|
|||||||
import electrumx
|
import electrumx
|
||||||
from electrumx.lib.hash import hash_to_hex_str, hex_str_to_hash
|
from electrumx.lib.hash import hash_to_hex_str, hex_str_to_hash
|
||||||
from electrumx.lib.hash import HASHX_LEN
|
from electrumx.lib.hash import HASHX_LEN
|
||||||
from electrumx.lib.merkle import Merkle, MerkleCache
|
|
||||||
from electrumx.lib.peer import Peer
|
from electrumx.lib.peer import Peer
|
||||||
from electrumx.lib.server_base import ServerBase
|
from electrumx.lib.server_base import ServerBase
|
||||||
import electrumx.lib.util as util
|
import electrumx.lib.util as util
|
||||||
@ -34,13 +33,6 @@ from electrumx.server.session import LocalRPC, BAD_REQUEST, DAEMON_ERROR
|
|||||||
|
|
||||||
|
|
||||||
version_string = util.version_string
|
version_string = util.version_string
|
||||||
merkle = Merkle()
|
|
||||||
|
|
||||||
|
|
||||||
class HeaderSource(object):
|
|
||||||
|
|
||||||
def __init__(self, db):
|
|
||||||
self.hashes = db.fs_block_hashes
|
|
||||||
|
|
||||||
|
|
||||||
class SessionGroup(object):
|
class SessionGroup(object):
|
||||||
@ -235,11 +227,6 @@ class Controller(ServerBase):
|
|||||||
'''Wait for the block processor to catch up, and for the mempool to
|
'''Wait for the block processor to catch up, and for the mempool to
|
||||||
synchronize, then kick off server background processes.'''
|
synchronize, then kick off server background processes.'''
|
||||||
await self.bp.caught_up_event.wait()
|
await self.bp.caught_up_event.wait()
|
||||||
self.logger.info('block processor has caught up')
|
|
||||||
length = max(1, self.bp.db_height - self.env.reorg_limit)
|
|
||||||
source = HeaderSource(self.bp)
|
|
||||||
self.header_mc = MerkleCache(merkle, source, length)
|
|
||||||
self.logger.info('populated header merkle cache')
|
|
||||||
self.create_task(self.mempool.main_loop())
|
self.create_task(self.mempool.main_loop())
|
||||||
await self.mempool.synchronized_event.wait()
|
await self.mempool.synchronized_event.wait()
|
||||||
self.create_task(self.peer_mgr.main_loop())
|
self.create_task(self.peer_mgr.main_loop())
|
||||||
@ -867,7 +854,7 @@ class Controller(ServerBase):
|
|||||||
f'block {block_hash} at height {height:,d}')
|
f'block {block_hash} at height {height:,d}')
|
||||||
|
|
||||||
hashes = [hex_str_to_hash(hash) for hash in tx_hashes]
|
hashes = [hex_str_to_hash(hash) for hash in tx_hashes]
|
||||||
branch, root = merkle.branch_and_root(hashes, pos)
|
branch, root = self.bp.merkle.branch_and_root(hashes, pos)
|
||||||
branch = [hash_to_hex_str(hash) for hash in branch]
|
branch = [hash_to_hex_str(hash) for hash in branch]
|
||||||
|
|
||||||
return {"block_height": height, "merkle": branch, "pos": pos}
|
return {"block_height": height, "merkle": branch, "pos": pos}
|
||||||
|
|||||||
@ -305,8 +305,7 @@ class ElectrumX(SessionBase):
|
|||||||
f'require header height {height:,d} <= '
|
f'require header height {height:,d} <= '
|
||||||
f'cp_height {cp_height:,d} <= '
|
f'cp_height {cp_height:,d} <= '
|
||||||
f'chain height {max_height:,d}')
|
f'chain height {max_height:,d}')
|
||||||
branch, root = self.controller.header_mc.branch_and_root(
|
branch, root = self.bp.header_mc.branch_and_root(cp_height + 1, height)
|
||||||
cp_height + 1, height)
|
|
||||||
return {
|
return {
|
||||||
'branch': [hash_to_hex_str(elt) for elt in branch],
|
'branch': [hash_to_hex_str(elt) for elt in branch],
|
||||||
'root': hash_to_hex_str(root),
|
'root': hash_to_hex_str(root),
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user