Merge branch 'mc-initialization' into devel
This commit is contained in:
commit
09edee6091
@ -50,7 +50,7 @@ Return the block header at the given height.
|
|||||||
|
|
||||||
**Example Result**
|
**Example Result**
|
||||||
|
|
||||||
With *cp_height* zero:
|
With *height* 5 and *cp_height* 0 on the Bitcoin Cash chain:
|
||||||
|
|
||||||
::
|
::
|
||||||
|
|
||||||
@ -58,7 +58,7 @@ With *cp_height* zero:
|
|||||||
|
|
||||||
.. _cp_height example:
|
.. _cp_height example:
|
||||||
|
|
||||||
With *cp_height* 8 on the Bitcoin Cash chain::
|
With *cp_height* 8::
|
||||||
|
|
||||||
{
|
{
|
||||||
"branch": [
|
"branch": [
|
||||||
|
|||||||
@ -28,6 +28,8 @@
|
|||||||
|
|
||||||
from math import ceil, log
|
from math import ceil, log
|
||||||
|
|
||||||
|
from aiorpcx import Event
|
||||||
|
|
||||||
from electrumx.lib.hash import double_sha256
|
from electrumx.lib.hash import double_sha256
|
||||||
|
|
||||||
|
|
||||||
@ -168,6 +170,7 @@ class MerkleCache(object):
|
|||||||
self.source_func = source_func
|
self.source_func = source_func
|
||||||
self.length = 0
|
self.length = 0
|
||||||
self.depth_higher = 0
|
self.depth_higher = 0
|
||||||
|
self.initialized = Event()
|
||||||
|
|
||||||
def _segment_length(self):
|
def _segment_length(self):
|
||||||
return 1 << self.depth_higher
|
return 1 << self.depth_higher
|
||||||
@ -210,6 +213,7 @@ class MerkleCache(object):
|
|||||||
self.length = length
|
self.length = length
|
||||||
self.depth_higher = self.merkle.tree_depth(length) // 2
|
self.depth_higher = self.merkle.tree_depth(length) // 2
|
||||||
self.level = self._level(await self.source_func(0, length))
|
self.level = self._level(await self.source_func(0, length))
|
||||||
|
self.initialized.set()
|
||||||
|
|
||||||
def truncate(self, length):
|
def truncate(self, length):
|
||||||
'''Truncate the cache so it covers no more than length underlying
|
'''Truncate the cache so it covers no more than length underlying
|
||||||
@ -238,6 +242,7 @@ class MerkleCache(object):
|
|||||||
raise ValueError('length must be positive')
|
raise ValueError('length must be positive')
|
||||||
if index >= length:
|
if index >= length:
|
||||||
raise ValueError('index must be less than length')
|
raise ValueError('index must be less than length')
|
||||||
|
await self.initialized.wait()
|
||||||
await self._extend_to(length)
|
await self._extend_to(length)
|
||||||
leaf_start = self._leaf_start(index)
|
leaf_start = self._leaf_start(index)
|
||||||
count = min(self._segment_length(), length - leaf_start)
|
count = min(self._segment_length(), length - leaf_start)
|
||||||
|
|||||||
@ -20,7 +20,6 @@ from aiorpcx import TaskGroup, run_in_thread
|
|||||||
import electrumx
|
import electrumx
|
||||||
from electrumx.server.daemon import DaemonError
|
from electrumx.server.daemon import DaemonError
|
||||||
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
|
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
|
||||||
from electrumx.lib.merkle import Merkle, MerkleCache
|
|
||||||
from electrumx.lib.util import chunks, formatted_time, class_logger
|
from electrumx.lib.util import chunks, formatted_time, class_logger
|
||||||
import electrumx.server.db
|
import electrumx.server.db
|
||||||
|
|
||||||
@ -166,10 +165,6 @@ class BlockProcessor(electrumx.server.db.DB):
|
|||||||
self.touched = set()
|
self.touched = set()
|
||||||
self.reorg_count = 0
|
self.reorg_count = 0
|
||||||
|
|
||||||
# Header merkle cache
|
|
||||||
self.merkle = Merkle()
|
|
||||||
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
|
|
||||||
|
|
||||||
# Caches of unflushed items.
|
# Caches of unflushed items.
|
||||||
self.headers = []
|
self.headers = []
|
||||||
self.tx_hashes = []
|
self.tx_hashes = []
|
||||||
@ -268,8 +263,6 @@ class BlockProcessor(electrumx.server.db.DB):
|
|||||||
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
|
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
|
||||||
await self.backup_flush()
|
await self.backup_flush()
|
||||||
last -= len(raw_blocks)
|
last -= len(raw_blocks)
|
||||||
# Truncate header_mc: header count is 1 more than the height.
|
|
||||||
self.header_mc.truncate(self.height + 1)
|
|
||||||
await self.prefetcher.reset_height(self.height)
|
await self.prefetcher.reset_height(self.height)
|
||||||
|
|
||||||
async def reorg_hashes(self, count):
|
async def reorg_hashes(self, count):
|
||||||
@ -429,9 +422,7 @@ class BlockProcessor(electrumx.server.db.DB):
|
|||||||
'''
|
'''
|
||||||
flush_start = time.time()
|
flush_start = time.time()
|
||||||
|
|
||||||
# Backup FS (just move the pointers back)
|
self.backup_fs(self.height, self.tx_count)
|
||||||
self.fs_height = self.height
|
|
||||||
self.fs_tx_count = self.tx_count
|
|
||||||
|
|
||||||
# Backup history. self.touched can include other addresses
|
# Backup history. self.touched can include other addresses
|
||||||
# which is harmless, but remove None.
|
# which is harmless, but remove None.
|
||||||
@ -776,10 +767,6 @@ class BlockProcessor(electrumx.server.db.DB):
|
|||||||
await self.notifications.on_block(set(), self.height)
|
await self.notifications.on_block(set(), self.height)
|
||||||
# Reopen for serving
|
# Reopen for serving
|
||||||
await self.open_for_serving()
|
await self.open_for_serving()
|
||||||
# Populate the header merkle cache
|
|
||||||
length = max(1, self.height - self.env.reorg_limit)
|
|
||||||
await self.header_mc.initialize(length)
|
|
||||||
self.logger.info('populated header merkle cache')
|
|
||||||
|
|
||||||
async def _first_open_dbs(self):
|
async def _first_open_dbs(self):
|
||||||
await self.open_for_sync()
|
await self.open_for_sync()
|
||||||
|
|||||||
@ -25,6 +25,7 @@ class ChainState(object):
|
|||||||
self.read_headers = self._bp.read_headers
|
self.read_headers = self._bp.read_headers
|
||||||
self.all_utxos = self._bp.all_utxos
|
self.all_utxos = self._bp.all_utxos
|
||||||
self.limited_history = self._bp.limited_history
|
self.limited_history = self._bp.limited_history
|
||||||
|
self.header_branch_and_root = self._bp.header_branch_and_root
|
||||||
|
|
||||||
async def broadcast_transaction(self, raw_tx):
|
async def broadcast_transaction(self, raw_tx):
|
||||||
return await self._daemon.sendrawtransaction([raw_tx])
|
return await self._daemon.sendrawtransaction([raw_tx])
|
||||||
@ -43,9 +44,6 @@ class ChainState(object):
|
|||||||
'db_height': self.db_height(),
|
'db_height': self.db_height(),
|
||||||
}
|
}
|
||||||
|
|
||||||
async def header_branch_and_root(self, length, height):
|
|
||||||
return self._bp.header_mc.branch_and_root(length, height)
|
|
||||||
|
|
||||||
async def raw_header(self, height):
|
async def raw_header(self, height):
|
||||||
'''Return the binary header at the given height.'''
|
'''Return the binary header at the given height.'''
|
||||||
header, n = await self.read_headers(height, 1)
|
header, n = await self.read_headers(height, 1)
|
||||||
|
|||||||
@ -108,6 +108,7 @@ class Controller(ServerBase):
|
|||||||
await group.spawn(session_mgr.serve(serve_externally_event))
|
await group.spawn(session_mgr.serve(serve_externally_event))
|
||||||
await group.spawn(bp.fetch_and_process_blocks(caught_up_event))
|
await group.spawn(bp.fetch_and_process_blocks(caught_up_event))
|
||||||
await caught_up_event.wait()
|
await caught_up_event.wait()
|
||||||
|
await group.spawn(bp.populate_header_merkle_cache())
|
||||||
await group.spawn(mempool.keep_synchronized(synchronized_event))
|
await group.spawn(mempool.keep_synchronized(synchronized_event))
|
||||||
await synchronized_event.wait()
|
await synchronized_event.wait()
|
||||||
serve_externally_event.set()
|
serve_externally_event.set()
|
||||||
|
|||||||
@ -12,6 +12,7 @@
|
|||||||
import array
|
import array
|
||||||
import ast
|
import ast
|
||||||
import os
|
import os
|
||||||
|
import time
|
||||||
from bisect import bisect_right
|
from bisect import bisect_right
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from glob import glob
|
from glob import glob
|
||||||
@ -21,6 +22,7 @@ from aiorpcx import run_in_thread
|
|||||||
|
|
||||||
import electrumx.lib.util as util
|
import electrumx.lib.util as util
|
||||||
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
|
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
|
||||||
|
from electrumx.lib.merkle import Merkle, MerkleCache
|
||||||
from electrumx.server.storage import db_class
|
from electrumx.server.storage import db_class
|
||||||
from electrumx.server.history import History
|
from electrumx.server.history import History
|
||||||
|
|
||||||
@ -63,6 +65,10 @@ class DB(object):
|
|||||||
|
|
||||||
self.logger.info(f'using {self.env.db_engine} for DB backend')
|
self.logger.info(f'using {self.env.db_engine} for DB backend')
|
||||||
|
|
||||||
|
# Header merkle cache
|
||||||
|
self.merkle = Merkle()
|
||||||
|
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
|
||||||
|
|
||||||
self.headers_file = util.LogicalFile('meta/headers', 2, 16000000)
|
self.headers_file = util.LogicalFile('meta/headers', 2, 16000000)
|
||||||
self.tx_counts_file = util.LogicalFile('meta/txcounts', 2, 2000000)
|
self.tx_counts_file = util.LogicalFile('meta/txcounts', 2, 2000000)
|
||||||
self.hashes_file = util.LogicalFile('meta/hashes', 4, 16000000)
|
self.hashes_file = util.LogicalFile('meta/hashes', 4, 16000000)
|
||||||
@ -130,6 +136,19 @@ class DB(object):
|
|||||||
self.utxo_db = None
|
self.utxo_db = None
|
||||||
await self._open_dbs(False)
|
await self._open_dbs(False)
|
||||||
|
|
||||||
|
# Header merkle cache
|
||||||
|
|
||||||
|
async def populate_header_merkle_cache(self):
|
||||||
|
self.logger.info('populating header merkle cache...')
|
||||||
|
length = max(1, self.height - self.env.reorg_limit)
|
||||||
|
start = time.time()
|
||||||
|
await self.header_mc.initialize(length)
|
||||||
|
elapsed = time.time() - start
|
||||||
|
self.logger.info(f'header merkle cache populated in {elapsed:.1f}s')
|
||||||
|
|
||||||
|
async def header_branch_and_root(self, length, height):
|
||||||
|
return await self.header_mc.branch_and_root(length, height)
|
||||||
|
|
||||||
def fs_update_header_offsets(self, offset_start, height_start, headers):
|
def fs_update_header_offsets(self, offset_start, height_start, headers):
|
||||||
if self.coin.STATIC_BLOCK_HEADERS:
|
if self.coin.STATIC_BLOCK_HEADERS:
|
||||||
return
|
return
|
||||||
@ -152,6 +171,13 @@ class DB(object):
|
|||||||
return self.dynamic_header_offset(height + 1)\
|
return self.dynamic_header_offset(height + 1)\
|
||||||
- self.dynamic_header_offset(height)
|
- self.dynamic_header_offset(height)
|
||||||
|
|
||||||
|
def backup_fs(self, height, tx_count):
|
||||||
|
'''Back up during a reorg. This just updates our pointers.'''
|
||||||
|
self.fs_height = height
|
||||||
|
self.fs_tx_count = tx_count
|
||||||
|
# Truncate header_mc: header count is 1 more than the height.
|
||||||
|
self.header_mc.truncate(height + 1)
|
||||||
|
|
||||||
def fs_update(self, fs_height, headers, block_tx_hashes):
|
def fs_update(self, fs_height, headers, block_tx_hashes):
|
||||||
'''Write headers, the tx_count array and block tx hashes to disk.
|
'''Write headers, the tx_count array and block tx hashes to disk.
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user