Merge branch 'devel'

This commit is contained in:
Neil Booth 2018-08-11 18:21:22 +09:00
commit 9e5b939dc6
21 changed files with 1606 additions and 705 deletions

View File

@ -23,6 +23,7 @@ install:
- pip install pycodestyle
- pip install pylru
- pip install pyrocksdb
- pip install pytest-asyncio
- pip install pytest-cov
- pip install Sphinx
# hashes

View File

@ -48,7 +48,7 @@ async def compact_history():
environ['DAEMON_URL'] = '' # Avoid Env erroring out
env = Env()
db = DB(env)
await db.open_for_sync()
await db.open_for_compacting()
assert not db.first_sync
history = db.history

View File

@ -62,7 +62,7 @@ async def query(args):
db = DB(env)
coin = env.coin
await db._open_dbs(False)
await db.open_for_serving()
if not args.scripts:
await print_stats(db.hist_db, db.utxo_db)
@ -73,20 +73,23 @@ async def query(args):
if not hashX:
continue
n = None
for n, (tx_hash, height) in enumerate(db.get_history(hashX, limit),
start=1):
history = await db.limited_history(hashX, limit=limit)
for n, (tx_hash, height) in enumerate(history, start=1):
print(f'History #{n:,d}: height {height:,d} '
f'tx_hash {hash_to_hex_str(tx_hash)}')
if n is None:
print('No history found')
n = None
for n, utxo in enumerate(db.get_utxos(hashX, limit), start=1):
utxos = await db.all_utxos(hashX)
for n, utxo in enumerate(utxos, start=1):
print(f'UTXO #{n:,d}: tx_hash {hash_to_hex_str(utxo.tx_hash)} '
f'tx_pos {utxo.tx_pos:,d} height {utxo.height:,d} '
f'value {utxo.value:,d}')
if n == limit:
break
if n is None:
print('No UTXOs found')
balance = db.get_balance(hashX)
balance = sum(utxo.value for utxo in utxos)
print(f'Balance: {coin.decimal_value(balance):,f} {coin.SHORTNAME}')

View File

@ -146,3 +146,63 @@ Removed methods
* :func:`blockchain.block.get_header`
* :func:`blockchain.block.get_chunk`
Version 1.5
===========
This protocol version makes changes intended to allow clients and
servers to more easily scale to support queries about busy addresses.
It has changes to reduce the amount of round-trip queries made in
common usage, and to make results more compact to reduce bandwidth
consumption.
RPC calls with potentially large responses have pagination support,
and the return value of :func:`blockchain.scripthash.subscribe`
changes. Script hash :ref:`status <status>` had to be recalculated
with each new transaction and was undefined if it included more than
one mempool transaction. Its calculation is linear in history length
resulting in quadratic complexity as history grows. Its calculation
for large histories was demanding for both the server to compute and
the client to check.
RPC calls and notifications that combined the effects of the mempool
and confirmed history are removed.
The changes are beneficial to clients and servers alike, but will
require changes to both client-side and server-side logic. In
particular, the client should track what block (by hash and height)
wallet data is synchronized to, and if that hash is no longer part of
the main chain, it will need to remove wallet data for blocks that
were reorganized away and get updated information as of the first
reorganized block. The effects are limited to script hashes
potentially affected by the reorg, and for most clients this will be
the empty set.
New methods
-----------
* :func:`blockchain.scripthash.history`
* :func:`blockchain.scripthash.utxos`
New notifications
-----------------
* :func:`mempool.changes`
Changes
-------
* :func:`blockchain.scripthash.subscribe` has changed its return value
and the notifications it sends
* :func:`blockchain.transaction.get` takes an additional optional
argument *merkle*
Removed methods
---------------
* :func:`blockchain.scripthash.get_history`. Switch to
:func:`blockchain.scripthash.history`
* :func:`blockchain.scripthash.get_mempool`. Switch to
handling :func:`mempool.changes` notifications
* :func:`blockchain.scripthash.listunspent`. Switch to
:func:`blockchain.scripthash.utxos`

View File

@ -50,7 +50,7 @@ Return the block header at the given height.
**Example Result**
With *cp_height* zero:
With *height* 5 and *cp_height* 0 on the Bitcoin Cash chain:
::
@ -58,7 +58,7 @@ With *cp_height* zero:
.. _cp_height example:
With *cp_height* 8 on the Bitcoin Cash chain::
With *cp_height* 8::
{
"branch": [
@ -310,166 +310,68 @@ Return the confirmed and unconfirmed balances of a :ref:`script hash
"unconfirmed": "0.236844"
}
blockchain.scripthash.get_history
=================================
blockchain.scripthash.history
=============================
Return the confirmed and unconfirmed history of a :ref:`script hash
<script hashes>`.
Return part of the confirmed history of a :ref:`script hash <script
hashes>`.
**Signature**
.. function:: blockchain.scripthash.get_history(scripthash)
.. versionadded:: 1.1
.. function:: blockchain.scripthash.history(scripthash, start_height)
.. versionadded:: 1.5
*scripthash*
The script hash as a hexadecimal string.
*start_height*
History will be returned starting from this height, a non-negative
integer. If there are several matching transactions in a block,
the server will return *all* of them -- partial results from a
block are not permitted. The client can start subsequent requests
at one above the greatest returned height and avoid repeats.
**Result**
A list of confirmed transactions in blockchain order, with the
output of :func:`blockchain.scripthash.get_mempool` appended to the
list. Each confirmed transaction is a dictionary with the following
keys:
A dictionary with the following keys.
* *height*
* *more*
The integer height of the block the transaction was confirmed in.
:const:`true` indicates that there *may* be more history
available. A follow-up request is required to obtain any.
:const:`false` means all history to blockchain's tip has been
returned.
* *tx_hash*
* *history*
The transaction hash in hexadecimal.
A list ot transactions. Each transaction is itself a list of
two elements:
See :func:`blockchain.scripthash.get_mempool` for how mempool
transactions are returned.
1. The block height
2. The transaction hash
**Result Examples**
::
[
{
"height": 200004,
"tx_hash": "acc3758bd2a26f869fcc67d48ff30b96464d476bca82c1cd6656e7d506816412"
},
{
"height": 215008,
"tx_hash": "f3e1bf48975b8d6060a9de8884296abb80be618dc00ae3cb2f6cee3085e09403"
}
]
{
"more": false,
"history": [
[
200004,
"acc3758bd2a26f869fcc67d48ff30b96464d476bca82c1cd6656e7d506816412"
],
[
215008,
"f3e1bf48975b8d6060a9de8884296abb80be618dc00ae3cb2f6cee3085e09403"
]
]
}
::
[
{
"fee": 20000,
"height": 0,
"tx_hash": "9fbed79a1e970343fcd39f4a2d830a6bde6de0754ed2da70f489d0303ed558ec"
}
]
blockchain.scripthash.get_mempool
=================================
Return the unconfirmed transactions of a :ref:`script hash <script
hashes>`.
**Signature**
.. function:: blockchain.scripthash.get_mempool(scripthash)
.. versionadded:: 1.1
*scripthash*
The script hash as a hexadecimal string.
**Result**
A list of mempool transactions in arbitrary order. Each mempool
transaction is a dictionary with the following keys:
* *height*
``0`` if all inputs are confirmed, and ``-1`` otherwise.
* *tx_hash*
The transaction hash in hexadecimal.
* *fee*
The transaction fee in minimum coin units (satoshis).
**Result Example**
::
[
{
"tx_hash": "45381031132c57b2ff1cbe8d8d3920cf9ed25efd9a0beb764bdb2f24c7d1c7e3",
"height": 0,
"fee": 24310
}
]
blockchain.scripthash.listunspent
=================================
Return an ordered list of UTXOs sent to a script hash.
**Signature**
.. function:: blockchain.scripthash.listunspent(scripthash)
.. versionadded:: 1.1
*scripthash*
The script hash as a hexadecimal string.
**Result**
A list of unspent outputs in blockchain order. This function takes
the mempool into account. Mempool transactions paying to the
address are included at the end of the list in an undefined order.
Any output that is spent in the mempool does not appear. Each
output is a dictionary with the following keys:
* *height*
The integer height of the block the transaction was confirmed in.
``0`` if the transaction is in the mempool.
* *tx_pos*
The zero-based index of the output in the transaction's list of
outputs.
* *tx_hash*
The output's transaction hash as a hexadecimal string.
* *value*
The output's value in minimum coin units (satoshis).
**Result Example**
::
[
{
"tx_pos": 0,
"value": 45318048,
"tx_hash": "9f2c45a12db0144909b5db269415f7319179105982ac70ed80d76ea79d923ebf",
"height": 437146
},
{
"tx_pos": 0,
"value": 919195,
"tx_hash": "3d2290c93436a3e964cfc2f0950174d8847b1fbe3946432c4784e168da0f019f",
"height": 441696
}
]
.. _subscribed:
blockchain.scripthash.subscribe
===============================
@ -487,15 +389,89 @@ Subscribe to a script hash.
**Result**
The :ref:`status <status>` of the script hash.
.. versionchanged:: 1.5
As of protocol 1.5, the transaction hash of the last confirmed
transaction in blockchain order, or :const:`null` if there are none.
For protocol versions 1.4 and below, the :ref:`status <status>` of
the script hash.
**Notifications**
As this is a subcription, the client will receive a notification
when the :ref:`status <status>` of the script hash changes. Its
signature is
.. versionchanged:: 1.5
As this is a subscription, the client receives notifications when
the confirmed transaction history and/or associated mempool
transactions change.
As of protocol 1.5, the initial mempool and subsequent changes to it
are sent with :func:`mempool.changes` notifications. When confirmed
history changes, a notification with signature
.. function:: blockchain.scripthash.subscribe(scripthash, tx_hash)
is sent, where *tx_hash* is the hash of the last confirmed
transaction in blockchain order.
For protocol versions 1.4 and below, the client will receive a
notification when the :ref:`status <status>` of the script hash
changes. Its signature is
.. function:: blockchain.scripthash.subscribe(scripthash, status)
blockchain.scripthash.utxos
===========================
Return some confirmed UTXOs sent to a script hash.
**Signature**
.. function:: blockchain.scripthash.utxos(scripthash, start_height)
.. versionadded:: 1.5
*scripthash*
The script hash as a hexadecimal string.
*start_height*
UTXOs will be returned starting from this height, a non-negative
integer. If there are several UTXOs in one block, the server will
return *all* of them -- partial results from a block are not
permitted. The client can start subsequent requests at one above
the greatest returned height and avoid repeats.
.. note:: To get the effects of transactions in the mempool adding or
removing UTXOs, a client must
:func:`blockchain.scripthash.subscribe` and track mempool
transactions sent via :func:`mempool.changes` notifications.
**Result**
A dictionary with the following keys.
* *more*
:const:`true` indicates that there *may* be more UTXOs available.
A follow-up request is required to obtain any. :const:`false`
means all UTXOs to the blockchain's tip have been returned.
* *utxos*
A list of UTXOs. Each UTXO is itself a list with the following
elements:
1. The height of the block the transaction is in
2. The transaction hash as a hexadecimal string
3. The zero-based index of the output in the transaction's outputs
4. The output value, an integer in minimum coin units (satoshis)
**Result Example**
::
**TODO**
.. function:: blockchain.scripthash.subscribe(scripthash, status)
blockchain.transaction.broadcast
================================
@ -542,11 +518,13 @@ Return a raw transaction.
**Signature**
.. function:: blockchain.transaction.get(tx_hash, verbose=false)
.. function:: blockchain.transaction.get(tx_hash, verbose=false, merkle=false)
.. versionchanged:: 1.1
ignored argument *height* removed
.. versionchanged:: 1.2
*verbose* argument added
.. versionchanged:: 1.5
*merkle* argument added
*tx_hash*
@ -556,16 +534,38 @@ Return a raw transaction.
Whether a verbose coin-specific response is required.
*markle*
Whether a merkle branch proof should be returned as well.
**Result**
If *verbose* is :const:`false`, the raw transaction as a
hexadecimal string. If :const:`true`, the result is coin-specific
and whatever the coin daemon returns when asked for a verbose form
of the raw transaction.
If *verbose* is :const:`false`:
If *merkle* is :const:`false`, the raw transaction as a
hexadecimal string. If :const:`true`, the dictionary returned
by :func:`blockchain.transaction.get_merkle` with an additional
key:
*hex*
The raw transaction as a hexadecimal string.
If *verbose* is :const:`true`:
The result is a coin-specific dictionary -- whatever the coin
daemon returns when asked for a verbose form of the raw
transaction. If *merkle* is :const:`true` it will have an
additional key:
*merkle*
The dictionary returned by
:func:`blockchain.transaction.get_merkle`.
**Example Results**
When *verbose* is :const:`false`::
When *verbose* is :const:`false` and *merkle* is :const:`false`::
"01000000015bb9142c960a838329694d3fe9ba08c2a6421c5158d8f7044cb7c48006c1b48"
"4000000006a4730440220229ea5359a63c2b83a713fcc20d8c41b20d48fe639a639d2a824"
@ -575,7 +575,7 @@ When *verbose* is :const:`false`::
"4fe5f88ac50a8cf00000000001976a91445dac110239a7a3814535c15858b939211f85298"
"88ac61ee0700"
When *verbose* is :const:`true`::
When *verbose* is :const:`true` and *merkle* is :const:`false`::
{
"blockhash": "0000000000000000015a4f37ece911e5e3549f988e855548ce7494a0a08b2ad6",
@ -735,6 +735,52 @@ When *merkle* is :const:`true`::
]
}
mempool.changes
===============
A notification that indicates changes to unconfirmed transactions of a
:ref:`subscribed <subscribed>` :ref:`script hash <script hashes>`. As
its name suggests the notification is stateful; its contents are a
function of what was sent previously.
**Signature**
.. function:: mempool.changes(scripthash, new, gone)
.. versionadded:: 1.5
The parameters are as follows:
* *scripthash*
The script hash the notification is for, a hexadecimal string.
* *new*
A list of transactions in the mempool that have not previously
been sent to the client, or whose *confirmed input* status
has changed. Each transaction is an ordered list of 3 items:
1. The raw transaction or its hash as a hexadecimal string. The
first time the server sends a transaction it sends it raw.
Subsequent references in the same *new* list or in later
notifications will send the hash only. Transactions cannot be
32 bytes in size so length can be used to distinguish.
2. The transaction fee, an integer in minimum coin units (satoshis)
3. :const:`true` if all inputs are confirmed otherwise :const:`false`
* *gone*
A list of hashes of transactions that were previously sent to the
client as being in the mempool but no longer are. Those
transactions presumably were confirmed in a block or were evicted
from the mempool.
**Notification Example**
::
**TODO**
mempool.get_fee_histogram
=========================

View File

@ -212,3 +212,164 @@ bandwidth-intensive request.
concatenated together. As many as headers as are available at the
implied starting height will be returned; this may range from zero
to the coin-specific chunk size.
blockchain.scripthash.get_history
=================================
Return the confirmed and unconfirmed history of a :ref:`script hash
<script hashes>`.
**Signature**
.. function:: blockchain.scripthash.get_history(scripthash)
.. versionadded:: 1.1
*scripthash*
The script hash as a hexadecimal string.
**Result**
A list of confirmed transactions in blockchain order, with the
output of :func:`blockchain.scripthash.get_mempool` appended to the
list. Each confirmed transaction is a dictionary with the following
keys:
* *height*
The integer height of the block the transaction was confirmed in.
* *tx_hash*
The transaction hash in hexadecimal.
See :func:`blockchain.scripthash.get_mempool` for how mempool
transactions are returned.
**Result Examples**
::
[
{
"height": 200004,
"tx_hash": "acc3758bd2a26f869fcc67d48ff30b96464d476bca82c1cd6656e7d506816412"
},
{
"height": 215008,
"tx_hash": "f3e1bf48975b8d6060a9de8884296abb80be618dc00ae3cb2f6cee3085e09403"
}
]
::
[
{
"fee": 20000,
"height": 0,
"tx_hash": "9fbed79a1e970343fcd39f4a2d830a6bde6de0754ed2da70f489d0303ed558ec"
}
]
blockchain.scripthash.listunspent
=================================
Return an ordered list of UTXOs sent to a script hash.
**Signature**
.. function:: blockchain.scripthash.listunspent(scripthash)
.. versionadded:: 1.1
*scripthash*
The script hash as a hexadecimal string.
**Result**
A list of unspent outputs in blockchain order. This function takes
the mempool into account. Mempool transactions paying to the
address are included at the end of the list in an undefined order.
Any output that is spent in the mempool does not appear. Each
output is a dictionary with the following keys:
* *height*
The integer height of the block the transaction was confirmed in.
``0`` if the transaction is in the mempool.
* *tx_pos*
The zero-based index of the output in the transaction's list of
outputs.
* *tx_hash*
The output's transaction hash as a hexadecimal string.
* *value*
The output's value in minimum coin units (satoshis).
**Result Example**
::
[
{
"tx_pos": 0,
"value": 45318048,
"tx_hash": "9f2c45a12db0144909b5db269415f7319179105982ac70ed80d76ea79d923ebf",
"height": 437146
},
{
"tx_pos": 0,
"value": 919195,
"tx_hash": "3d2290c93436a3e964cfc2f0950174d8847b1fbe3946432c4784e168da0f019f",
"height": 441696
}
]
blockchain.scripthash.get_mempool
=================================
Return the unconfirmed transactions of a :ref:`script hash <script
hashes>`.
**Signature**
.. function:: blockchain.scripthash.get_mempool(scripthash)
.. versionadded:: 1.1
*scripthash*
The script hash as a hexadecimal string.
**Result**
A list of mempool transactions in arbitrary order. Each mempool
transaction is a dictionary with the following keys:
* *height*
``0`` if all inputs are confirmed, and ``-1`` otherwise.
* *tx_hash*
The transaction hash in hexadecimal.
* *fee*
The transaction fee in minimum coin units (satoshis).
**Result Example**
::
[
{
"tx_hash": "45381031132c57b2ff1cbe8d8d3920cf9ed25efd9a0beb764bdb2f24c7d1c7e3",
"height": 0,
"fee": 24310
}
]

View File

@ -1,4 +1,4 @@
version = 'ElectrumX 1.8.2'
version = 'ElectrumX 1.8.3-dev'
version_short = version.split()[-1]
from electrumx.server.controller import Controller

View File

@ -211,6 +211,14 @@ class Coin(object):
'''
return ScriptPubKey.P2PK_script(pubkey)
@classmethod
def hash160_to_P2PKH_script(cls, hash160):
return ScriptPubKey.P2PKH_script(hash160)
@classmethod
def hash160_to_P2PKH_hashX(cls, hash160):
return cls.hashX_from_script(cls.hash160_to_P2PKH_script(hash160))
@classmethod
def pay_to_address_script(cls, address):
'''Return a pubkey script that pays to a pubkey hash.
@ -223,12 +231,12 @@ class Coin(object):
verbyte = -1
verlen = len(raw) - 20
if verlen > 0:
verbyte, hash_bytes = raw[:verlen], raw[verlen:]
verbyte, hash160 = raw[:verlen], raw[verlen:]
if verbyte == cls.P2PKH_VERBYTE:
return ScriptPubKey.P2PKH_script(hash_bytes)
return cls.hash160_to_P2PKH_script(hash160)
if verbyte in cls.P2SH_VERBYTES:
return ScriptPubKey.P2SH_script(hash_bytes)
return ScriptPubKey.P2SH_script(hash160)
raise CoinError('invalid address: {}'.format(address))

View File

@ -28,6 +28,8 @@
from math import ceil, log
from aiorpcx import Event
from electrumx.lib.hash import double_sha256
@ -158,13 +160,17 @@ class Merkle(object):
class MerkleCache(object):
'''A cache to calculate merkle branches efficiently.'''
def __init__(self, merkle, source, length):
'''Initialise a cache of length hashes taken from source.'''
def __init__(self, merkle, source_func):
'''Initialise a cache hashes taken from source_func:
async def source_func(index, count):
...
'''
self.merkle = merkle
self.source = source
self.length = length
self.depth_higher = merkle.tree_depth(length) // 2
self.level = self._level(source.hashes(0, length))
self.source_func = source_func
self.length = 0
self.depth_higher = 0
self.initialized = Event()
def _segment_length(self):
return 1 << self.depth_higher
@ -179,18 +185,18 @@ class MerkleCache(object):
def _level(self, hashes):
return self.merkle.level(hashes, self.depth_higher)
def _extend_to(self, length):
async def _extend_to(self, length):
'''Extend the length of the cache if necessary.'''
if length <= self.length:
return
# Start from the beginning of any final partial segment.
# Retain the value of depth_higher; in practice this is fine
start = self._leaf_start(self.length)
hashes = self.source.hashes(start, length - start)
hashes = await self.source_func(start, length - start)
self.level[start >> self.depth_higher:] = self._level(hashes)
self.length = length
def _level_for(self, length):
async def _level_for(self, length):
'''Return a (level_length, final_hash) pair for a truncation
of the hashes to the given length.'''
if length == self.length:
@ -198,10 +204,17 @@ class MerkleCache(object):
level = self.level[:length >> self.depth_higher]
leaf_start = self._leaf_start(length)
count = min(self._segment_length(), length - leaf_start)
hashes = self.source.hashes(leaf_start, count)
hashes = await self.source_func(leaf_start, count)
level += self._level(hashes)
return level
async def initialize(self, length):
'''Call to initialize the cache to a source of given length.'''
self.length = length
self.depth_higher = self.merkle.tree_depth(length) // 2
self.level = self._level(await self.source_func(0, length))
self.initialized.set()
def truncate(self, length):
'''Truncate the cache so it covers no more than length underlying
hashes.'''
@ -215,7 +228,7 @@ class MerkleCache(object):
self.length = length
self.level[length >> self.depth_higher:] = []
def branch_and_root(self, length, index):
async def branch_and_root(self, length, index):
'''Return a merkle branch and root. Length is the number of
hashes used to calculate the merkle root, index is the position
of the hash to calculate the branch of.
@ -229,12 +242,13 @@ class MerkleCache(object):
raise ValueError('length must be positive')
if index >= length:
raise ValueError('index must be less than length')
self._extend_to(length)
await self.initialized.wait()
await self._extend_to(length)
leaf_start = self._leaf_start(index)
count = min(self._segment_length(), length - leaf_start)
leaf_hashes = self.source.hashes(leaf_start, count)
leaf_hashes = await self.source_func(leaf_start, count)
if length < self._segment_length():
return self.merkle.branch_and_root(leaf_hashes, index)
level = self._level_for(length)
level = await self._level_for(length)
return self.merkle.branch_and_root_from_level(
level, leaf_hashes, index, self.depth_higher)

View File

@ -93,28 +93,21 @@ class ServerBase(object):
loop.set_exception_handler(self.on_exception)
shutdown_event = asyncio.Event()
try:
async with TaskGroup() as group:
server_task = await group.spawn(self.serve(shutdown_event))
# Wait for shutdown, log on receipt of the event
await shutdown_event.wait()
self.logger.info('shutting down')
server_task.cancel()
finally:
await loop.shutdown_asyncgens()
async with TaskGroup() as group:
server_task = await group.spawn(self.serve(shutdown_event))
# Wait for shutdown, log on receipt of the event
await shutdown_event.wait()
self.logger.info('shutting down')
server_task.cancel()
# Prevent some silly logs
await asyncio.sleep(0.001)
# Finally, work around an apparent asyncio bug that causes log
# spew on shutdown for partially opened SSL sockets
try:
del asyncio.sslproto._SSLProtocolTransport.__del__
except Exception:
pass
await asyncio.sleep(0.01)
self.logger.info('shutdown complete')
def run(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self._main(loop))
loop.close()
try:
loop.run_until_complete(self._main(loop))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())

View File

@ -20,9 +20,8 @@ from aiorpcx import TaskGroup, run_in_thread
import electrumx
from electrumx.server.daemon import DaemonError
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
from electrumx.lib.merkle import Merkle, MerkleCache
from electrumx.lib.util import chunks, formatted_time, class_logger
import electrumx.server.db
from electrumx.lib.util import chunks, class_logger
from electrumx.server.db import FlushData
class Prefetcher(object):
@ -56,7 +55,7 @@ class Prefetcher(object):
if not await self._prefetch_blocks():
await asyncio.sleep(5)
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
self.logger.info(f'ignoring daemon error: {e}')
def get_prefetched_blocks(self):
'''Called by block processor when it is processing queued blocks.'''
@ -139,43 +138,33 @@ class Prefetcher(object):
return True
class HeaderSource(object):
def __init__(self, db):
self.hashes = db.fs_block_hashes
class ChainError(Exception):
'''Raised on error processing blocks.'''
class BlockProcessor(electrumx.server.db.DB):
class BlockProcessor(object):
'''Process blocks and update the DB state to match.
Employ a prefetcher to prefetch blocks in batches for processing.
Coordinate backing up in case of chain reorganisations.
'''
def __init__(self, env, daemon, notifications):
super().__init__(env)
def __init__(self, env, db, daemon, notifications):
self.env = env
self.db = db
self.daemon = daemon
self.notifications = notifications
self.coin = env.coin
self.blocks_event = asyncio.Event()
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event)
self.logger = class_logger(__name__, self.__class__.__name__)
# Meta
self.cache_MB = env.cache_MB
self.next_cache_check = 0
self.last_flush = time.time()
self.touched = set()
self.reorg_count = 0
# Header merkle cache
self.merkle = Merkle()
self.header_mc = None
# Caches of unflushed items.
self.headers = []
self.tx_hashes = []
@ -189,7 +178,7 @@ class BlockProcessor(electrumx.server.db.DB):
# is consistent with self.height
self.state_lock = asyncio.Lock()
async def run_in_thread_shielded(self, func, *args):
async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task
# completes the data will be flushed and then we shut down.
@ -214,7 +203,14 @@ class BlockProcessor(electrumx.server.db.DB):
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
if hprevs == chain:
await self.run_in_thread_shielded(self.advance_blocks, blocks)
start = time.time()
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
await self._maybe_flush()
if not self.db.first_sync:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'
.format(len(blocks), s,
time.time() - start))
if self._caught_up_event.is_set():
await self.notifications.on_block(self.touched, self.height)
self.touched = set()
@ -239,7 +235,7 @@ class BlockProcessor(electrumx.server.db.DB):
self.logger.info('chain reorg detected')
else:
self.logger.info(f'faking a reorg of {count:,d} blocks')
await run_in_thread(self.flush, True)
await self.flush(True)
async def get_raw_blocks(last_height, hex_hashes):
heights = range(last_height, last_height - len(hex_hashes), -1)
@ -250,17 +246,20 @@ class BlockProcessor(electrumx.server.db.DB):
except Exception:
return await self.daemon.raw_blocks(hex_hashes)
def flush_backup():
# self.touched can include other addresses which is
# harmless, but remove None.
self.touched.discard(None)
self.db.flush_backup(self.flush_data(), self.touched)
start, last, hashes = await self.reorg_hashes(count)
# Reverse and convert to hex strings.
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50):
raw_blocks = await get_raw_blocks(last, hex_hashes)
await self.run_in_thread_shielded(self.backup_blocks, raw_blocks)
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
await self.run_in_thread_with_lock(flush_backup)
last -= len(raw_blocks)
# Truncate header_mc: header count is 1 more than the height.
# Note header_mc is None if the reorg happens at startup.
if self.header_mc:
self.header_mc.truncate(self.height + 1)
await self.prefetcher.reset_height(self.height)
async def reorg_hashes(self, count):
@ -276,7 +275,7 @@ class BlockProcessor(electrumx.server.db.DB):
self.logger.info(f'chain was reorganised replacing {count:,d} '
f'block{s} at heights {start:,d}-{last:,d}')
return start, last, self.fs_block_hashes(start, count)
return start, last, await self.db.fs_block_hashes(start, count)
async def calc_reorg_range(self, count):
'''Calculate the reorg range'''
@ -294,7 +293,7 @@ class BlockProcessor(electrumx.server.db.DB):
start = self.height - 1
count = 1
while start > 0:
hashes = self.fs_block_hashes(start, count)
hashes = await self.db.fs_block_hashes(start, count)
hex_hashes = [hash_to_hex_str(hash) for hash in hashes]
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
n = diff_pos(hex_hashes, d_hex_hashes)
@ -310,133 +309,40 @@ class BlockProcessor(electrumx.server.db.DB):
return start, count
def flush_state(self, batch):
'''Flush chain state to the batch.'''
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.last_flush_tx_count = self.tx_count
self.write_utxo_state(batch)
def estimate_txs_remaining(self):
# Try to estimate how many txs there are to go
daemon_height = self.daemon.cached_height()
coin = self.coin
tail_count = daemon_height - max(self.height, coin.TX_COUNT_HEIGHT)
# Damp the initial enthusiasm
realism = max(2.0 - 0.9 * self.height / coin.TX_COUNT_HEIGHT, 1.0)
return (tail_count * coin.TX_PER_BLOCK +
max(coin.TX_COUNT - self.tx_count, 0)) * realism
def assert_flushed(self):
'''Asserts state is fully flushed.'''
assert self.tx_count == self.fs_tx_count == self.db_tx_count
assert self.height == self.fs_height == self.db_height
assert not self.undo_infos
assert not self.utxo_cache
assert not self.db_deletes
self.history.assert_flushed()
# - Flushing
def flush_data(self):
'''The data for a flush. The lock must be taken.'''
assert self.state_lock.locked()
return FlushData(self.height, self.tx_count, self.headers,
self.tx_hashes, self.undo_infos, self.utxo_cache,
self.db_deletes, self.tip)
def flush(self, flush_utxos=False):
'''Flush out cached state.
async def flush(self, flush_utxos):
def flush():
self.db.flush_dbs(self.flush_data(), flush_utxos,
self.estimate_txs_remaining)
await self.run_in_thread_with_lock(flush)
History is always flushed. UTXOs are flushed if flush_utxos.'''
if self.height == self.db_height:
self.assert_flushed()
return
flush_start = time.time()
last_flush = self.last_flush
tx_diff = self.tx_count - self.last_flush_tx_count
# Flush to file system
self.fs_flush()
fs_end = time.time()
if self.utxo_db.for_sync:
self.logger.info('flushed to FS in {:.1f}s'
.format(fs_end - flush_start))
# History next - it's fast and frees memory
hashX_count = self.history.flush()
if self.utxo_db.for_sync:
self.logger.info('flushed history in {:.1f}s for {:,d} addrs'
.format(time.time() - fs_end, hashX_count))
# Flush state last as it reads the wall time.
with self.utxo_db.write_batch() as batch:
if flush_utxos:
self.flush_utxos(batch)
self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.utxo_db)
self.logger.info('flush #{:,d} took {:.1f}s. Height {:,d} txs: {:,d}'
.format(self.history.flush_count,
self.last_flush - flush_start,
self.height, self.tx_count))
# Catch-up stats
if self.utxo_db.for_sync:
tx_per_sec = int(self.tx_count / self.wall_time)
this_tx_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush))
self.logger.info('tx/sec since genesis: {:,d}, '
'since last flush: {:,d}'
.format(tx_per_sec, this_tx_per_sec))
daemon_height = self.daemon.cached_height()
if self.height > self.coin.TX_COUNT_HEIGHT:
tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK
else:
tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT)
* self.coin.TX_PER_BLOCK
+ (self.coin.TX_COUNT - self.tx_count))
# Damp the enthusiasm
realism = 2.0 - 0.9 * self.height / self.coin.TX_COUNT_HEIGHT
tx_est *= max(realism, 1.0)
self.logger.info('sync time: {} ETA: {}'
.format(formatted_time(self.wall_time),
formatted_time(tx_est / this_tx_per_sec)))
def fs_flush(self):
'''Flush the things stored on the filesystem.'''
assert self.fs_height + len(self.headers) == self.height
assert self.tx_count == self.tx_counts[-1] if self.tx_counts else 0
self.fs_update(self.fs_height, self.headers, self.tx_hashes)
self.fs_height = self.height
self.fs_tx_count = self.tx_count
self.tx_hashes = []
self.headers = []
def backup_flush(self):
'''Like flush() but when backing up. All UTXOs are flushed.
hashXs - sequence of hashXs which were touched by backing
up. Searched for history entries to remove after the backup
height.
'''
assert self.height < self.db_height
self.history.assert_flushed()
flush_start = time.time()
# Backup FS (just move the pointers back)
self.fs_height = self.height
self.fs_tx_count = self.tx_count
assert not self.headers
assert not self.tx_hashes
# Backup history. self.touched can include other addresses
# which is harmless, but remove None.
self.touched.discard(None)
nremoves = self.history.backup(self.touched, self.tx_count)
self.logger.info('backing up removed {:,d} history entries'
.format(nremoves))
with self.utxo_db.write_batch() as batch:
# Flush state last as it reads the wall time.
self.flush_utxos(batch)
self.flush_state(batch)
self.logger.info('backup flush #{:,d} took {:.1f}s. '
'Height {:,d} txs: {:,d}'
.format(self.history.flush_count,
self.last_flush - flush_start,
self.height, self.tx_count))
async def _maybe_flush(self):
# If caught up, flush everything as client queries are
# performed on the DB.
if self._caught_up_event.is_set():
await self.flush(True)
elif time.time() > self.next_cache_check:
flush_arg = self.check_cache_size()
if flush_arg is not None:
await self.flush(flush_arg)
self.next_cache_check = time.time() + 30
def check_cache_size(self):
'''Flush a cache if it gets too big.'''
@ -445,10 +351,10 @@ class BlockProcessor(electrumx.server.db.DB):
one_MB = 1000*1000
utxo_cache_size = len(self.utxo_cache) * 205
db_deletes_size = len(self.db_deletes) * 57
hist_cache_size = self.history.unflushed_memsize()
hist_cache_size = self.db.history.unflushed_memsize()
# Roughly ntxs * 32 + nblocks * 42
tx_hash_size = ((self.tx_count - self.fs_tx_count) * 32
+ (self.height - self.fs_height) * 42)
tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32
+ (self.height - self.db.fs_height) * 42)
utxo_MB = (db_deletes_size + utxo_cache_size) // one_MB
hist_MB = (hist_cache_size + tx_hash_size) // one_MB
@ -459,16 +365,17 @@ class BlockProcessor(electrumx.server.db.DB):
# Flush history if it takes up over 20% of cache memory.
# Flush UTXOs once they take up 80% of cache memory.
if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5:
self.flush(utxo_MB >= self.cache_MB * 4 // 5)
cache_MB = self.env.cache_MB
if utxo_MB + hist_MB >= cache_MB or hist_MB >= cache_MB // 5:
return utxo_MB >= cache_MB * 4 // 5
return None
def advance_blocks(self, blocks):
'''Synchronously advance the blocks.
It is already verified they correctly connect onto our tip.
'''
start = time.time()
min_height = self.min_undo_height(self.daemon.cached_height())
min_height = self.db.min_undo_height(self.daemon.cached_height())
height = self.height
for block in blocks:
@ -476,28 +383,13 @@ class BlockProcessor(electrumx.server.db.DB):
undo_info = self.advance_txs(block.transactions)
if height >= min_height:
self.undo_infos.append((undo_info, height))
self.write_raw_block(block.raw, height)
self.db.write_raw_block(block.raw, height)
headers = [block.header for block in blocks]
self.height = height
self.headers.extend(headers)
self.tip = self.coin.header_hash(headers[-1])
# If caught up, flush everything as client queries are
# performed on the DB.
if self._caught_up_event.is_set():
self.flush(True)
else:
if time.time() > self.next_cache_check:
self.check_cache_size()
self.next_cache_check = time.time() + 30
if not self.first_sync:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'
.format(len(blocks), s,
time.time() - start))
def advance_txs(self, txs):
self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs))
@ -538,10 +430,10 @@ class BlockProcessor(electrumx.server.db.DB):
update_touched(hashXs)
tx_num += 1
self.history.add_unflushed(hashXs_by_tx, self.tx_count)
self.db.history.add_unflushed(hashXs_by_tx, self.tx_count)
self.tx_count = tx_num
self.tx_counts.append(tx_num)
self.db.tx_counts.append(tx_num)
return undo_info
@ -551,7 +443,7 @@ class BlockProcessor(electrumx.server.db.DB):
The blocks should be in order of decreasing height, starting at.
self.height. A flush is performed once the blocks are backed up.
'''
self.assert_flushed()
self.db.assert_flushed(self.flush_data())
assert self.height >= len(raw_blocks)
coin = self.coin
@ -567,15 +459,14 @@ class BlockProcessor(electrumx.server.db.DB):
self.tip = coin.header_prevhash(block.header)
self.backup_txs(block.transactions)
self.height -= 1
self.tx_counts.pop()
self.db.tx_counts.pop()
self.logger.info('backed up to height {:,d}'.format(self.height))
self.backup_flush()
def backup_txs(self, txs):
# Prevout values, in order down the block (coinbase first if present)
# undo_info is in reverse block order
undo_info = self.read_undo_info(self.height)
undo_info = self.db.read_undo_info(self.height)
if undo_info is None:
raise ChainError('no undo information found for height {:,d}'
.format(self.height))
@ -683,14 +574,14 @@ class BlockProcessor(electrumx.server.db.DB):
# Value: hashX
prefix = b'h' + tx_hash[:4] + idx_packed
candidates = {db_key: hashX for db_key, hashX
in self.utxo_db.iterator(prefix=prefix)}
in self.db.utxo_db.iterator(prefix=prefix)}
for hdb_key, hashX in candidates.items():
tx_num_packed = hdb_key[-4:]
if len(candidates) > 1:
tx_num, = unpack('<I', tx_num_packed)
hash, height = self.fs_tx_hash(tx_num)
hash, height = self.db.fs_tx_hash(tx_num)
if hash != tx_hash:
assert hash is not None # Should always be found
continue
@ -698,7 +589,7 @@ class BlockProcessor(electrumx.server.db.DB):
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
udb_key = b'u' + hashX + hdb_key[-6:]
utxo_value_packed = self.utxo_db.get(udb_key)
utxo_value_packed = self.db.utxo_db.get(udb_key)
if utxo_value_packed:
# Remove both entries for this UTXO
self.db_deletes.append(hdb_key)
@ -708,48 +599,6 @@ class BlockProcessor(electrumx.server.db.DB):
raise ChainError('UTXO {} / {:,d} not found in "h" table'
.format(hash_to_hex_str(tx_hash), tx_idx))
def flush_utxos(self, batch):
'''Flush the cached DB writes and UTXO set to the batch.'''
# Care is needed because the writes generated by flushing the
# UTXO state may have keys in common with our write cache or
# may be in the DB already.
flush_start = time.time()
delete_count = len(self.db_deletes) // 2
utxo_cache_len = len(self.utxo_cache)
# Spends
batch_delete = batch.delete
for key in sorted(self.db_deletes):
batch_delete(key)
self.db_deletes = []
# New UTXOs
batch_put = batch.put
for cache_key, cache_value in self.utxo_cache.items():
# suffix = tx_idx + tx_num
hashX = cache_value[:-12]
suffix = cache_key[-2:] + cache_value[-12:-8]
batch_put(b'h' + cache_key[:4] + suffix, hashX)
batch_put(b'u' + hashX + suffix, cache_value[-8:])
self.utxo_cache = {}
# New undo information
self.flush_undo_infos(batch_put, self.undo_infos)
self.undo_infos = []
if self.utxo_db.for_sync:
self.logger.info('flushed {:,d} blocks with {:,d} txs, {:,d} UTXO '
'adds, {:,d} spends in {:.1f}s, committing...'
.format(self.height - self.db_height,
self.tx_count - self.db_tx_count,
utxo_cache_len, delete_count,
time.time() - flush_start))
self.utxo_flush_count = self.history.flush_count
self.db_tx_count = self.tx_count
self.db_height = self.height
self.db_tip = self.tip
async def _process_prefetched_blocks(self):
'''Loop forever processing blocks as they arrive.'''
while True:
@ -769,35 +618,22 @@ class BlockProcessor(electrumx.server.db.DB):
async def _first_caught_up(self):
self.logger.info(f'caught up to height {self.height}')
# Flush everything but with first_sync->False state.
first_sync = self.first_sync
self.first_sync = False
self.flush(True)
first_sync = self.db.first_sync
self.db.first_sync = False
await self.flush(True)
if first_sync:
self.logger.info(f'{electrumx.version} synced to '
f'height {self.height:,d}')
# Initialise the notification framework
await self.notifications.on_block(set(), self.height)
# Reopen for serving
await self.open_for_serving()
# Populate the header merkle cache
length = max(1, self.height - self.env.reorg_limit)
self.header_mc = MerkleCache(self.merkle, HeaderSource(self), length)
self.logger.info('populated header merkle cache')
await self.db.open_for_serving()
async def _first_open_dbs(self):
await self.open_for_sync()
# An incomplete compaction needs to be cancelled otherwise
# restarting it will corrupt the history
self.history.cancel_compaction()
# These are our state as we move ahead of DB state
self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count
self.height = self.db_height
self.tip = self.db_tip
self.tx_count = self.db_tx_count
self.last_flush_tx_count = self.tx_count
if self.utxo_db.for_sync:
self.logger.info(f'flushing DB cache at {self.cache_MB:,d} MB')
await self.db.open_for_sync()
self.height = self.db.db_height
self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count
# --- External API
@ -823,10 +659,9 @@ class BlockProcessor(electrumx.server.db.DB):
await group.spawn(self.prefetcher.main_loop(self.height))
await group.spawn(self._process_prefetched_blocks())
finally:
async with self.state_lock:
# Shut down block processing
self.logger.info('flushing to DB for a clean shutdown...')
self.flush(True)
# Shut down block processing
self.logger.info('flushing to DB for a clean shutdown...')
await self.flush(True)
def force_chain_reorg(self, count):
'''Force a reorg of the given number of blocks.

View File

@ -6,8 +6,6 @@
# and warranty status of this software.
from aiorpcx import run_in_thread
from electrumx.lib.hash import hash_to_hex_str
@ -16,15 +14,18 @@ class ChainState(object):
blocks, transaction history, UTXOs and the mempool.
'''
def __init__(self, env, daemon, bp):
def __init__(self, env, db, daemon, bp):
self._env = env
self._db = db
self._daemon = daemon
self._bp = bp
# External interface pass-throughs for session.py
self.force_chain_reorg = self._bp.force_chain_reorg
self.tx_branch_and_root = self._bp.merkle.branch_and_root
self.read_headers = self._bp.read_headers
self.force_chain_reorg = bp.force_chain_reorg
self.tx_branch_and_root = db.merkle.branch_and_root
self.read_headers = db.read_headers
self.all_utxos = db.all_utxos
self.limited_history = db.limited_history
self.header_branch_and_root = db.header_branch_and_root
async def broadcast_transaction(self, raw_tx):
return await self._daemon.sendrawtransaction([raw_tx])
@ -33,7 +34,7 @@ class ChainState(object):
return await getattr(self._daemon, method)(*args)
def db_height(self):
return self._bp.db_height
return self._db.db_height
def get_info(self):
'''Chain state info for LocalRPC and logs.'''
@ -43,35 +44,9 @@ class ChainState(object):
'db_height': self.db_height(),
}
async def get_history(self, hashX):
'''Get history asynchronously to reduce latency.'''
def job():
# History DoS limit. Each element of history is about 99
# bytes when encoded as JSON. This limits resource usage
# on bloated history requests, and uses a smaller divisor
# so large requests are logged before refusing them.
limit = self._env.max_send // 97
return list(self._bp.get_history(hashX, limit=limit))
return await run_in_thread(job)
async def get_utxos(self, hashX):
'''Get UTXOs asynchronously to reduce latency.'''
def job():
return list(self._bp.get_utxos(hashX, limit=None))
return await run_in_thread(job)
def header_branch_and_root(self, length, height):
return self._bp.header_mc.branch_and_root(length, height)
def processing_new_block(self):
'''Return True if we're processing a new block.'''
return self._daemon.cached_height() > self.db_height()
def raw_header(self, height):
async def raw_header(self, height):
'''Return the binary header at the given height.'''
header, n = self._bp.read_headers(height, 1)
header, n = await self.read_headers(height, 1)
if n != 1:
raise IndexError(f'height {height:,d} out of range')
return header
@ -82,7 +57,7 @@ class ChainState(object):
async def query(self, args, limit):
coin = self._env.coin
db = self._bp
db = self._db
lines = []
def arg_to_hashX(arg):
@ -102,22 +77,25 @@ class ChainState(object):
if not hashX:
continue
n = None
for n, (tx_hash, height) in enumerate(
db.get_history(hashX, limit), start=1):
history = await db.limited_history(hashX, limit=limit)
for n, (tx_hash, height) in enumerate(history):
lines.append(f'History #{n:,d}: height {height:,d} '
f'tx_hash {hash_to_hex_str(tx_hash)}')
if n is None:
lines.append('No history found')
n = None
for n, utxo in enumerate(db.get_utxos(hashX, limit), start=1):
utxos = await db.all_utxos(hashX)
for n, utxo in enumerate(utxos, start=1):
lines.append(f'UTXO #{n:,d}: tx_hash '
f'{hash_to_hex_str(utxo.tx_hash)} '
f'tx_pos {utxo.tx_pos:,d} height '
f'{utxo.height:,d} value {utxo.value:,d}')
if n == limit:
break
if n is None:
lines.append('No UTXOs found')
balance = db.get_balance(hashX)
balance = sum(utxo.value for utxo in utxos)
lines.append(f'Balance: {coin.decimal_value(balance):,f} '
f'{coin.SHORTNAME}')

View File

@ -13,7 +13,8 @@ import electrumx
from electrumx.lib.server_base import ServerBase
from electrumx.lib.util import version_string
from electrumx.server.chain_state import ChainState
from electrumx.server.mempool import MemPool
from electrumx.server.db import DB
from electrumx.server.mempool import MemPool, MemPoolAPI
from electrumx.server.session import SessionManager
@ -93,10 +94,21 @@ class Controller(ServerBase):
notifications = Notifications()
daemon = env.coin.DAEMON(env)
db = DB(env)
BlockProcessor = env.coin.BLOCK_PROCESSOR
bp = BlockProcessor(env, daemon, notifications)
mempool = MemPool(env.coin, daemon, notifications, bp.lookup_utxos)
chain_state = ChainState(env, daemon, bp)
bp = BlockProcessor(env, db, daemon, notifications)
chain_state = ChainState(env, db, daemon, bp)
# Set ourselves up to implement the MemPoolAPI
self.height = daemon.height
self.cached_height = daemon.cached_height
self.mempool_hashes = daemon.mempool_hashes
self.raw_transactions = daemon.getrawtransactions
self.lookup_utxos = db.lookup_utxos
self.on_mempool = notifications.on_mempool
MemPoolAPI.register(Controller)
mempool = MemPool(env.coin, self)
session_mgr = SessionManager(env, chain_state, mempool,
notifications, shutdown_event)
@ -108,6 +120,7 @@ class Controller(ServerBase):
await group.spawn(session_mgr.serve(serve_externally_event))
await group.spawn(bp.fetch_and_process_blocks(caught_up_event))
await caught_up_event.wait()
await group.spawn(db.populate_header_merkle_cache())
await group.spawn(mempool.keep_synchronized(synchronized_event))
await synchronized_event.wait()
serve_externally_event.set()

View File

@ -12,15 +12,19 @@
import array
import ast
import os
import time
from bisect import bisect_right
from collections import namedtuple
from glob import glob
from struct import pack, unpack
import attr
from aiorpcx import run_in_thread
import electrumx.lib.util as util
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
from electrumx.lib.merkle import Merkle, MerkleCache
from electrumx.lib.util import formatted_time
from electrumx.server.storage import db_class
from electrumx.server.history import History
@ -28,6 +32,19 @@ from electrumx.server.history import History
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
@attr.s(slots=True)
class FlushData(object):
height = attr.ib()
tx_count = attr.ib()
headers = attr.ib()
block_tx_hashes = attr.ib()
# The following are flushed to the UTXO DB if undo_infos is not None
undo_infos = attr.ib()
adds = attr.ib()
deletes = attr.ib()
tip = attr.ib()
class DB(object):
'''Simple wrapper of the backend database for querying.
@ -60,9 +77,14 @@ class DB(object):
self.history = History()
self.utxo_db = None
self.tx_counts = None
self.last_flush = time.time()
self.logger.info(f'using {self.env.db_engine} for DB backend')
# Header merkle cache
self.merkle = Merkle()
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
self.headers_file = util.LogicalFile('meta/headers', 2, 16000000)
self.tx_counts_file = util.LogicalFile('meta/txcounts', 2, 2000000)
self.hashes_file = util.LogicalFile('meta/hashes', 4, 16000000)
@ -84,7 +106,7 @@ class DB(object):
else:
assert self.db_tx_count == 0
async def _open_dbs(self, for_sync):
async def _open_dbs(self, for_sync, compacting):
assert self.utxo_db is None
# First UTXO DB
@ -104,12 +126,16 @@ class DB(object):
# Then history DB
self.utxo_flush_count = self.history.open_db(self.db_class, for_sync,
self.utxo_flush_count)
self.utxo_flush_count,
compacting)
self.clear_excess_undo_info()
# Read TX counts (requires meta directory)
await self._read_tx_counts()
async def open_for_compacting(self):
await self._open_dbs(True, True)
async def open_for_sync(self):
'''Open the databases to sync to the daemon.
@ -117,7 +143,7 @@ class DB(object):
synchronization. When serving clients we want the open files for
serving network connections.
'''
await self._open_dbs(True)
await self._open_dbs(True, False)
async def open_for_serving(self):
'''Open the databases for serving. If they are already open they are
@ -128,7 +154,192 @@ class DB(object):
self.utxo_db.close()
self.history.close_db()
self.utxo_db = None
await self._open_dbs(False)
await self._open_dbs(False, False)
# Header merkle cache
async def populate_header_merkle_cache(self):
self.logger.info('populating header merkle cache...')
length = max(1, self.db_height - self.env.reorg_limit)
start = time.time()
await self.header_mc.initialize(length)
elapsed = time.time() - start
self.logger.info(f'header merkle cache populated in {elapsed:.1f}s')
async def header_branch_and_root(self, length, height):
return await self.header_mc.branch_and_root(length, height)
# Flushing
def assert_flushed(self, flush_data):
'''Asserts state is fully flushed.'''
assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count
assert flush_data.height == self.fs_height == self.db_height
assert flush_data.tip == self.db_tip
assert not flush_data.headers
assert not flush_data.block_tx_hashes
assert not flush_data.adds
assert not flush_data.deletes
assert not flush_data.undo_infos
self.history.assert_flushed()
def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining):
'''Flush out cached state. History is always flushed; UTXOs are
flushed if flush_utxos.'''
if flush_data.height == self.db_height:
self.assert_flushed(flush_data)
return
start_time = time.time()
prior_flush = self.last_flush
tx_delta = flush_data.tx_count - self.last_flush_tx_count
# Flush to file system
self.flush_fs(flush_data)
# Then history
self.flush_history()
# Flush state last as it reads the wall time.
with self.utxo_db.write_batch() as batch:
if flush_utxos:
self.flush_utxo_db(batch, flush_data)
self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.utxo_db)
elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.history.flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
# Catch-up stats
if self.utxo_db.for_sync:
flush_interval = self.last_flush - prior_flush
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
eta = estimate_txs_remaining() / tx_per_sec_last
self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, '
f'since last flush: {tx_per_sec_last:,d}')
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
f'ETA: {formatted_time(eta)}')
def flush_fs(self, flush_data):
'''Write headers, tx counts and block tx hashes to the filesystem.
The first height to write is self.fs_height + 1. The FS
metadata is all append-only, so in a crash we just pick up
again from the height stored in the DB.
'''
prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0)
assert len(flush_data.block_tx_hashes) == len(flush_data.headers)
assert flush_data.height == self.fs_height + len(flush_data.headers)
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
else 0)
assert len(self.tx_counts) == flush_data.height + 1
hashes = b''.join(flush_data.block_tx_hashes)
flush_data.block_tx_hashes.clear()
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == flush_data.tx_count - prior_tx_count
# Write the headers, tx counts, and tx hashes
start_time = time.time()
height_start = self.fs_height + 1
offset = self.header_offset(height_start)
self.headers_file.write(offset, b''.join(flush_data.headers))
self.fs_update_header_offsets(offset, height_start, flush_data.headers)
flush_data.headers.clear()
offset = height_start * self.tx_counts.itemsize
self.tx_counts_file.write(offset,
self.tx_counts[height_start:].tobytes())
offset = prior_tx_count * 32
self.hashes_file.write(offset, hashes)
self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count
if self.utxo_db.for_sync:
elapsed = time.time() - start_time
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
def flush_history(self):
self.history.flush()
def flush_utxo_db(self, batch, flush_data):
'''Flush the cached DB writes and UTXO set to the batch.'''
# Care is needed because the writes generated by flushing the
# UTXO state may have keys in common with our write cache or
# may be in the DB already.
start_time = time.time()
add_count = len(flush_data.adds)
spend_count = len(flush_data.deletes) // 2
# Spends
batch_delete = batch.delete
for key in sorted(flush_data.deletes):
batch_delete(key)
flush_data.deletes.clear()
# New UTXOs
batch_put = batch.put
for key, value in flush_data.adds.items():
# suffix = tx_idx + tx_num
hashX = value[:-12]
suffix = key[-2:] + value[-12:-8]
batch_put(b'h' + key[:4] + suffix, hashX)
batch_put(b'u' + hashX + suffix, value[-8:])
flush_data.adds.clear()
# New undo information
self.flush_undo_infos(batch_put, flush_data.undo_infos)
flush_data.undo_infos.clear()
if self.utxo_db.for_sync:
block_count = flush_data.height - self.db_height
tx_count = flush_data.tx_count - self.db_tx_count
elapsed = time.time() - start_time
self.logger.info(f'flushed {block_count:,d} blocks with '
f'{tx_count:,d} txs, {add_count:,d} UTXO adds, '
f'{spend_count:,d} spends in '
f'{elapsed:.1f}s, committing...')
self.utxo_flush_count = self.history.flush_count
self.db_height = flush_data.height
self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip
def flush_state(self, batch):
'''Flush chain state to the batch.'''
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.last_flush_tx_count = self.fs_tx_count
self.write_utxo_state(batch)
def flush_backup(self, flush_data, touched):
'''Like flush_dbs() but when backing up. All UTXOs are flushed.'''
assert not flush_data.headers
assert not flush_data.block_tx_hashes
assert flush_data.height < self.db_height
self.history.assert_flushed()
start_time = time.time()
tx_delta = flush_data.tx_count - self.last_flush_tx_count
self.backup_fs(flush_data.height, flush_data.tx_count)
self.history.backup(touched, flush_data.tx_count)
with self.utxo_db.write_batch() as batch:
self.flush_utxo_db(batch, flush_data)
# Flush state last as it reads the wall time.
self.flush_state(batch)
elapsed = self.last_flush - start_time
self.logger.info(f'backup flush #{self.history.flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
def fs_update_header_offsets(self, offset_start, height_start, headers):
if self.coin.STATIC_BLOCK_HEADERS:
@ -152,37 +363,14 @@ class DB(object):
return self.dynamic_header_offset(height + 1)\
- self.dynamic_header_offset(height)
def fs_update(self, fs_height, headers, block_tx_hashes):
'''Write headers, the tx_count array and block tx hashes to disk.
def backup_fs(self, height, tx_count):
'''Back up during a reorg. This just updates our pointers.'''
self.fs_height = height
self.fs_tx_count = tx_count
# Truncate header_mc: header count is 1 more than the height.
self.header_mc.truncate(height + 1)
Their first height is fs_height. No recorded DB state is
updated. These arrays are all append only, so in a crash we
just pick up again from the DB height.
'''
blocks_done = len(headers)
height_start = fs_height + 1
new_height = fs_height + blocks_done
prior_tx_count = (self.tx_counts[fs_height] if fs_height >= 0 else 0)
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert len(block_tx_hashes) == blocks_done
assert len(self.tx_counts) == new_height + 1
hashes = b''.join(block_tx_hashes)
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
# Write the headers, tx counts, and tx hashes
offset = self.header_offset(height_start)
self.headers_file.write(offset, b''.join(headers))
self.fs_update_header_offsets(offset, height_start, headers)
offset = height_start * self.tx_counts.itemsize
self.tx_counts_file.write(offset,
self.tx_counts[height_start:].tobytes())
offset = prior_tx_count * 32
self.hashes_file.write(offset, hashes)
def read_headers(self, start_height, count):
async def read_headers(self, start_height, count):
'''Requires start_height >= 0, count >= 0. Reads as many headers as
are available starting at start_height up to count. This
would be zero if start_height is beyond self.db_height, for
@ -191,16 +379,20 @@ class DB(object):
Returns a (binary, n) pair where binary is the concatenated
binary headers, and n is the count of headers returned.
'''
# Read some from disk
if start_height < 0 or count < 0:
raise self.DBError('{:,d} headers starting at {:,d} not on disk'
.format(count, start_height))
disk_count = max(0, min(count, self.db_height + 1 - start_height))
if disk_count:
offset = self.header_offset(start_height)
size = self.header_offset(start_height + disk_count) - offset
return self.headers_file.read(offset, size), disk_count
return b'', 0
raise self.DBError(f'{count:,d} headers starting at '
f'{start_height:,d} not on disk')
def read_headers():
# Read some from disk
disk_count = max(0, min(count, self.db_height + 1 - start_height))
if disk_count:
offset = self.header_offset(start_height)
size = self.header_offset(start_height + disk_count) - offset
return self.headers_file.read(offset, size), disk_count
return b'', 0
return await run_in_thread(read_headers)
def fs_tx_hash(self, tx_num):
'''Return a par (tx_hash, tx_height) for the given tx number.
@ -213,8 +405,8 @@ class DB(object):
tx_hash = self.hashes_file.read(tx_num * 32, 32)
return tx_hash, tx_height
def fs_block_hashes(self, height, count):
headers_concat, headers_count = self.read_headers(height, count)
async def fs_block_hashes(self, height, count):
headers_concat, headers_count = await self.read_headers(height, count)
if headers_count != count:
raise self.DBError('only got {:,d} headers starting at {:,d}, not '
'{:,d}'.format(headers_count, height, count))
@ -227,15 +419,19 @@ class DB(object):
return [self.coin.header_hash(header) for header in headers]
def get_history(self, hashX, limit=1000):
'''Generator that returns an unpruned, sorted list of (tx_hash,
height) tuples of confirmed transactions that touched the address,
earliest in the blockchain first. Includes both spending and
receiving transactions. By default yields at most 1000 entries.
Set limit to None to get them all.
async def limited_history(self, hashX, *, limit=1000):
'''Return an unpruned, sorted list of (tx_hash, height) tuples of
confirmed transactions that touched the address, earliest in
the blockchain first. Includes both spending and receiving
transactions. By default returns at most 1000 entries. Set
limit to None to get them all.
'''
for tx_num in self.history.get_txnums(hashX, limit):
yield self.fs_tx_hash(tx_num)
def read_history():
tx_nums = list(self.history.get_txnums(hashX, limit))
fs_tx_hash = self.fs_tx_hash
return [fs_tx_hash(tx_num) for tx_num in tx_nums]
return await run_in_thread(read_history)
# -- Undo information
@ -345,6 +541,11 @@ class DB(object):
self.wall_time = state['wall_time']
self.first_sync = state['first_sync']
# These are our state as we move ahead of DB state
self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count
self.last_flush_tx_count = self.fs_tx_count
# Log some stats
self.logger.info('DB version: {:d}'.format(self.db_version))
self.logger.info('coin: {}'.format(self.coin.NAME))
@ -352,6 +553,8 @@ class DB(object):
self.logger.info('height: {:,d}'.format(self.db_height))
self.logger.info('tip: {}'.format(hash_to_hex_str(self.db_tip)))
self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
if self.utxo_db.for_sync:
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
if self.first_sync:
self.logger.info('sync time so far: {}'
.format(util.formatted_time(self.wall_time)))
@ -375,28 +578,25 @@ class DB(object):
with self.utxo_db.write_batch() as batch:
self.write_utxo_state(batch)
def get_balance(self, hashX):
'''Returns the confirmed balance of an address.'''
return sum(utxo.value for utxo in self.get_utxos(hashX, limit=None))
def get_utxos(self, hashX, limit=1000):
'''Generator that yields all UTXOs for an address sorted in no
particular order. By default yields at most 1000 entries.
Set limit to None to get them all.
async def all_utxos(self, hashX):
'''Return all UTXOs for an address sorted in no particular order. By
default yields at most 1000 entries.
'''
limit = util.resolve_limit(limit)
s_unpack = unpack
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
prefix = b'u' + hashX
for db_key, db_value in self.utxo_db.iterator(prefix=prefix):
if limit == 0:
return
limit -= 1
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value)
tx_hash, height = self.fs_tx_hash(tx_num)
yield UTXO(tx_num, tx_pos, tx_hash, height, value)
def read_utxos():
utxos = []
utxos_append = utxos.append
s_unpack = unpack
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
prefix = b'u' + hashX
for db_key, db_value in self.utxo_db.iterator(prefix=prefix):
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value)
tx_hash, height = self.fs_tx_hash(tx_num)
utxos_append(UTXO(tx_num, tx_pos, tx_hash, height, value))
return utxos
return await run_in_thread(read_utxos)
async def lookup_utxos(self, prevouts):
'''For each prevout, lookup it up in the DB and return a (hashX,

View File

@ -11,6 +11,7 @@
import array
import ast
import bisect
import time
from collections import defaultdict
from functools import partial
from struct import pack, unpack
@ -31,10 +32,14 @@ class History(object):
self.unflushed_count = 0
self.db = None
def open_db(self, db_class, for_sync, utxo_flush_count):
def open_db(self, db_class, for_sync, utxo_flush_count, compacting):
self.db = db_class('hist', for_sync)
self.read_state()
self.clear_excess(utxo_flush_count)
# An incomplete compaction needs to be cancelled otherwise
# restarting it will corrupt the history
if not compacting:
self._cancel_compaction()
return self.flush_count
def close_db(self):
@ -80,7 +85,7 @@ class History(object):
if flush_id > utxo_flush_count:
keys.append(key)
self.logger.info('deleting {:,d} history entries'.format(len(keys)))
self.logger.info(f'deleting {len(keys):,d} history entries')
self.flush_count = utxo_flush_count
with self.db.write_batch() as batch:
@ -119,6 +124,7 @@ class History(object):
assert not self.unflushed
def flush(self):
start_time = time.time()
self.flush_count += 1
flush_id = pack('>H', self.flush_count)
unflushed = self.unflushed
@ -132,7 +138,11 @@ class History(object):
count = len(unflushed)
unflushed.clear()
self.unflushed_count = 0
return count
if self.db.for_sync:
elapsed = time.time() - start_time
self.logger.info(f'flushed history in {elapsed:.1f}s '
f'for {count:,d} addrs')
def backup(self, hashXs, tx_count):
# Not certain this is needed, but it doesn't hurt
@ -161,7 +171,7 @@ class History(object):
batch.put(key, value)
self.write_state(batch)
return nremoves
self.logger.info(f'backing up removed {nremoves:,d} history entries')
def get_txnums(self, hashX, limit=1000):
'''Generator that returns an unpruned, sorted list of tx_nums in the
@ -307,7 +317,7 @@ class History(object):
100 * cursor / 65536))
return write_size
def cancel_compaction(self):
def _cancel_compaction(self):
if self.comp_cursor != -1:
self.logger.warning('cancelling in-progress history compaction')
self.comp_flush_count = -1

View File

@ -7,13 +7,14 @@
'''Mempool handling.'''
import asyncio
import itertools
import time
from abc import ABC, abstractmethod
from asyncio import Lock
from collections import defaultdict
import attr
from aiorpcx import TaskGroup, run_in_thread
from aiorpcx import TaskGroup, run_in_thread, sleep
from electrumx.lib.hash import hash_to_hex_str, hex_str_to_hash
from electrumx.lib.util import class_logger, chunks
@ -30,9 +31,60 @@ class MemPoolTx(object):
size = attr.ib()
@attr.s(slots=True)
class MemPoolTxSummary(object):
hash = attr.ib()
fee = attr.ib()
has_unconfirmed_inputs = attr.ib()
class MemPoolAPI(ABC):
'''A concrete instance of this class is passed to the MemPool object
and used by it to query DB and blockchain state.'''
@abstractmethod
async def height(self):
'''Query bitcoind for its height.'''
@abstractmethod
def cached_height(self):
'''Return the height of bitcoind the last time it was queried,
for any reason, without actually querying it.
'''
@abstractmethod
async def mempool_hashes(self):
'''Query bitcoind for the hashes of all transactions in its
mempool, returned as a list.'''
@abstractmethod
async def raw_transactions(self, hex_hashes):
'''Query bitcoind for the serialized raw transactions with the given
hashes. Missing transactions are returned as None.
hex_hashes is an iterable of hexadecimal hash strings.'''
@abstractmethod
async def lookup_utxos(self, prevouts):
'''Return a list of (hashX, value) pairs each prevout if unspent,
otherwise return None if spent or not found.
prevouts - an iterable of (hash, index) pairs
'''
@abstractmethod
async def on_mempool(self, touched, height):
'''Called each time the mempool is synchronized. touched is a set of
hashXs touched since the previous call. height is the
daemon's height at the time the mempool was obtained.'''
class MemPool(object):
'''Representation of the daemon's mempool.
coin - a coin class from coins.py
api - an object implementing MemPoolAPI
Updated regularly in caught-up state. Goal is to enable efficient
response to the calls in the external interface. To that end we
maintain the following maps:
@ -41,23 +93,42 @@ class MemPool(object):
hashXs: hashX -> set of all hashes of txs touching the hashX
'''
def __init__(self, coin, daemon, notifications, lookup_utxos):
self.logger = class_logger(__name__, self.__class__.__name__)
def __init__(self, coin, api, refresh_secs=5.0, log_status_secs=120.0):
assert isinstance(api, MemPoolAPI)
self.coin = coin
self.lookup_utxos = lookup_utxos
self.daemon = daemon
self.notifications = notifications
self.api = api
self.logger = class_logger(__name__, self.__class__.__name__)
self.txs = {}
self.hashXs = defaultdict(set) # None can be a key
self.cached_compact_histogram = []
self.refresh_secs = refresh_secs
self.log_status_secs = log_status_secs
# Prevents mempool refreshes during fee histogram calculation
self.lock = Lock()
async def _log_stats(self):
async def _logging(self, synchronized_event):
'''Print regular logs of mempool stats.'''
self.logger.info('beginning processing of daemon mempool. '
'This can take some time...')
start = time.time()
await synchronized_event.wait()
elapsed = time.time() - start
self.logger.info(f'synced in {elapsed:.2f}s')
while True:
self.logger.info(f'{len(self.txs):,d} txs '
f'touching {len(self.hashXs):,d} addresses')
await asyncio.sleep(120)
await sleep(self.log_status_secs)
await synchronized_event.wait()
def _update_histogram(self):
async def _refresh_histogram(self, synchronized_event):
while True:
await synchronized_event.wait()
async with self.lock:
# Threaded as can be expensive
await run_in_thread(self._update_histogram, 100_000)
await sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS)
def _update_histogram(self, bin_size):
# Build a histogram by fee rate
histogram = defaultdict(int)
for tx in self.txs.values():
@ -74,7 +145,6 @@ class MemPool(object):
compact = []
cum_size = 0
r = 0 # ?
bin_size = 100 * 1000
for fee_rate, size in sorted(histogram.items(), reverse=True):
cum_size += size
if cum_size + r > bin_size:
@ -129,21 +199,18 @@ class MemPool(object):
async def _refresh_hashes(self, synchronized_event):
'''Refresh our view of the daemon's mempool.'''
sleep = 5
histogram_refresh = self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS // sleep
for loop_count in itertools.count():
height = self.daemon.cached_height()
hex_hashes = await self.daemon.mempool_hashes()
if height != await self.daemon.height():
while True:
height = self.api.cached_height()
hex_hashes = await self.api.mempool_hashes()
if height != await self.api.height():
continue
hashes = set(hex_str_to_hash(hh) for hh in hex_hashes)
touched = await self._process_mempool(hashes)
async with self.lock:
touched = await self._process_mempool(hashes)
synchronized_event.set()
await self.notifications.on_mempool(touched, height)
# Thread mempool histogram refreshes - they can be expensive
if loop_count % histogram_refresh == 0:
await run_in_thread(self._update_histogram)
await asyncio.sleep(sleep)
synchronized_event.clear()
await self.api.on_mempool(touched, height)
await sleep(self.refresh_secs)
async def _process_mempool(self, all_hashes):
# Re-sync with the new set of hashes
@ -176,9 +243,6 @@ class MemPool(object):
tx_map.update(deferred)
utxo_map.update(unspent)
# Handle the stragglers
if len(tx_map) >= 10:
self.logger.info(f'{len(tx_map)} stragglers')
prior_count = 0
# FIXME: this is not particularly efficient
while tx_map and len(tx_map) != prior_count:
@ -193,7 +257,7 @@ class MemPool(object):
async def _fetch_and_accept(self, hashes, all_hashes, touched):
'''Fetch a list of mempool transactions.'''
hex_hashes_iter = (hash_to_hex_str(hash) for hash in hashes)
raw_txs = await self.daemon.getrawtransactions(hex_hashes_iter)
raw_txs = await self.api.raw_transactions(hex_hashes_iter)
def deserialize_txs(): # This function is pure
to_hashX = self.coin.hashX_from_script
@ -225,7 +289,7 @@ class MemPool(object):
prevouts = tuple(prevout for tx in tx_map.values()
for prevout in tx.prevouts
if prevout[0] not in all_hashes)
utxos = await self.lookup_utxos(prevouts)
utxos = await self.api.lookup_utxos(prevouts)
utxo_map = {prevout: utxo for prevout, utxo in zip(prevouts, utxos)}
return self._accept_transactions(tx_map, utxo_map, touched)
@ -235,19 +299,11 @@ class MemPool(object):
#
async def keep_synchronized(self, synchronized_event):
'''Starts the mempool synchronizer.
Waits for an initial synchronization before returning.
'''
self.logger.info('beginning processing of daemon mempool. '
'This can take some time...')
async with TaskGroup() as group:
'''Keep the mempool synchronized with the daemon.'''
async with TaskGroup(wait=any) as group:
await group.spawn(self._refresh_hashes(synchronized_event))
start = time.time()
await synchronized_event.wait()
elapsed = time.time() - start
self.logger.info(f'synced in {elapsed:.2f}s')
await group.spawn(self._log_stats())
await group.spawn(self._refresh_histogram(synchronized_event))
await group.spawn(self._logging(synchronized_event))
async def balance_delta(self, hashX):
'''Return the unconfirmed amount in the mempool for hashX.
@ -255,7 +311,6 @@ class MemPool(object):
Can be positive or negative.
'''
value = 0
# hashXs is a defaultdict
if hashX in self.hashXs:
for hash in self.hashXs[hashX]:
tx = self.txs[hash]
@ -271,7 +326,8 @@ class MemPool(object):
'''Return a set of (prev_hash, prev_idx) pairs from mempool
transactions that touch hashX.
None, some or all of these may be spends of the hashX.
None, some or all of these may be spends of the hashX, but all
actual spends of it (in the DB or mempool) will be included.
'''
result = set()
for tx_hash in self.hashXs.get(hashX, ()):
@ -280,18 +336,12 @@ class MemPool(object):
return result
async def transaction_summaries(self, hashX):
'''Return a list of (tx_hash, tx_fee, unconfirmed) tuples for
mempool entries for the hashX.
unconfirmed is True if any txin is unconfirmed.
'''
# hashXs is a defaultdict, so use get() to query
'''Return a list of MemPoolTxSummary objects for the hashX.'''
result = []
for tx_hash in self.hashXs.get(hashX, ()):
tx = self.txs[tx_hash]
unconfirmed = any(prev_hash in self.txs
for prev_hash, prev_idx in tx.prevouts)
result.append((tx_hash, tx.fee, unconfirmed))
has_ui = any(hash in self.txs for hash, idx in tx.prevouts)
result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui))
return result
async def unordered_UTXOs(self, hashX):
@ -302,7 +352,6 @@ class MemPool(object):
the outputs.
'''
utxos = []
# hashXs is a defaultdict, so use get() to query
for tx_hash in self.hashXs.get(hashX, ()):
tx = self.txs.get(tx_hash)
for pos, (hX, value) in enumerate(tx.out_pairs):

View File

@ -313,7 +313,7 @@ class PeerManager(object):
# Check prior header too in case of hard fork.
check_height = min(our_height, their_height)
raw_header = self.chain_state.raw_header(check_height)
raw_header = await self.chain_state.raw_header(check_height)
if ptuple >= (1, 4):
ours = raw_header.hex()
message = 'blockchain.block.header'

View File

@ -61,6 +61,13 @@ def non_negative_integer(value):
f'{value} should be a non-negative integer')
def assert_boolean(value):
'''Return param value it is boolean otherwise raise an RPCError.'''
if value in (False, True):
return value
raise RPCError(BAD_REQUEST, f'{value} should be a boolean value')
def assert_tx_hash(value):
'''Raise an RPCError if the value is not a valid transaction
hash.'''
@ -447,11 +454,17 @@ class SessionManager(object):
'''The number of connections that we've sent something to.'''
return len(self.sessions)
async def get_history(self, hashX):
async def limited_history(self, hashX):
'''A caching layer.'''
hc = self._history_cache
if hashX not in hc:
hc[hashX] = await self.chain_state.get_history(hashX)
# History DoS limit. Each element of history is about 99
# bytes when encoded as JSON. This limits resource usage
# on bloated history requests, and uses a smaller divisor
# so large requests are logged before refusing them.
limit = self.env.max_send // 97
hc[hashX] = await self.chain_state.limited_history(hashX,
limit=limit)
return hc[hashX]
async def _notify_sessions(self, height, touched):
@ -705,7 +718,7 @@ class ElectrumX(SessionBase):
if height_changed:
self.notified_height = height
if self.subscribe_headers:
args = (self.subscribe_headers_result(height), )
args = (await self.subscribe_headers_result(height), )
await self.send_notification('blockchain.headers.subscribe',
args)
@ -713,49 +726,44 @@ class ElectrumX(SessionBase):
if touched or (height_changed and self.mempool_statuses):
await self.notify_touched(touched)
def assert_boolean(self, value):
'''Return param value it is boolean otherwise raise an RPCError.'''
if value in (False, True):
return value
raise RPCError(BAD_REQUEST, f'{value} should be a boolean value')
def raw_header(self, height):
async def raw_header(self, height):
'''Return the binary header at the given height.'''
try:
return self.chain_state.raw_header(height)
return await self.chain_state.raw_header(height)
except IndexError:
raise RPCError(BAD_REQUEST, f'height {height:,d} out of range')
raise RPCError(BAD_REQUEST, f'height {height:,d} '
'out of range') from None
def electrum_header(self, height):
async def electrum_header(self, height):
'''Return the deserialized header at the given height.'''
raw_header = self.raw_header(height)
raw_header = await self.raw_header(height)
return self.coin.electrum_header(raw_header, height)
def subscribe_headers_result(self, height):
async def subscribe_headers_result(self, height):
'''The result of a header subscription for the given height.'''
if self.subscribe_headers_raw:
raw_header = self.raw_header(height)
raw_header = await self.raw_header(height)
return {'hex': raw_header.hex(), 'height': height}
return self.electrum_header(height)
return await self.electrum_header(height)
def _headers_subscribe(self, raw):
async def _headers_subscribe(self, raw):
'''Subscribe to get headers of new blocks.'''
self.subscribe_headers = True
self.subscribe_headers_raw = self.assert_boolean(raw)
self.subscribe_headers_raw = assert_boolean(raw)
self.notified_height = self.db_height()
return self.subscribe_headers_result(self.notified_height)
return await self.subscribe_headers_result(self.notified_height)
async def headers_subscribe(self):
'''Subscribe to get raw headers of new blocks.'''
return self._headers_subscribe(True)
return await self._headers_subscribe(True)
async def headers_subscribe_True(self, raw=True):
'''Subscribe to get headers of new blocks.'''
return self._headers_subscribe(raw)
return await self._headers_subscribe(raw)
async def headers_subscribe_False(self, raw=False):
'''Subscribe to get headers of new blocks.'''
return self._headers_subscribe(raw)
return await self._headers_subscribe(raw)
async def add_peer(self, features):
'''Add a peer (but only if the peer resolves to the source).'''
@ -771,15 +779,16 @@ class ElectrumX(SessionBase):
Status is a hex string, but must be None if there is no history.
'''
# Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0
history = await self.session_mgr.get_history(hashX)
# For mempool, height is -1 if it has unconfirmed inputs, otherwise 0
db_history = await self.session_mgr.limited_history(hashX)
mempool = await self.mempool.transaction_summaries(hashX)
status = ''.join('{}:{:d}:'.format(hash_to_hex_str(tx_hash), height)
for tx_hash, height in history)
status += ''.join('{}:{:d}:'.format(hash_to_hex_str(hex_hash),
-unconfirmed)
for hex_hash, tx_fee, unconfirmed in mempool)
status = ''.join(f'{hash_to_hex_str(tx_hash)}:'
f'{height:d}:'
for tx_hash, height in db_history)
status += ''.join(f'{hash_to_hex_str(tx.hash)}:'
f'{-tx.has_unconfirmed_inputs:d}:'
for tx in mempool)
if status:
status = sha256(status.encode()).hex()
else:
@ -795,7 +804,7 @@ class ElectrumX(SessionBase):
async def hashX_listunspent(self, hashX):
'''Return the list of UTXOs of a script hash, including mempool
effects.'''
utxos = await self.chain_state.get_utxos(hashX)
utxos = await self.chain_state.all_utxos(hashX)
utxos = sorted(utxos)
utxos.extend(await self.mempool.unordered_UTXOs(hashX))
spends = await self.mempool.potential_spends(hashX)
@ -852,7 +861,7 @@ class ElectrumX(SessionBase):
return await self.hashX_subscribe(hashX, address)
async def get_balance(self, hashX):
utxos = await self.chain_state.get_utxos(hashX)
utxos = await self.chain_state.all_utxos(hashX)
confirmed = sum(utxo.value for utxo in utxos)
unconfirmed = await self.mempool.balance_delta(hashX)
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
@ -864,15 +873,15 @@ class ElectrumX(SessionBase):
async def unconfirmed_history(self, hashX):
# Note unconfirmed history is unordered in electrum-server
# Height is -1 if unconfirmed txins, otherwise 0
mempool = await self.mempool.transaction_summaries(hashX)
return [{'tx_hash': hash_to_hex_str(tx_hash), 'height': -unconfirmed,
'fee': fee}
for tx_hash, fee, unconfirmed in mempool]
# height is -1 if it has unconfirmed inputs, otherwise 0
return [{'tx_hash': hash_to_hex_str(tx.hash),
'height': -tx.has_unconfirmed_inputs,
'fee': tx.fee}
for tx in await self.mempool.transaction_summaries(hashX)]
async def confirmed_and_unconfirmed_history(self, hashX):
# Note history is ordered but unconfirmed is unordered in e-s
history = await self.session_mgr.get_history(hashX)
history = await self.session_mgr.limited_history(hashX)
conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height}
for tx_hash, height in history]
return conf + await self.unconfirmed_history(hashX)
@ -899,14 +908,14 @@ class ElectrumX(SessionBase):
hashX = scripthash_to_hashX(scripthash)
return await self.hashX_subscribe(hashX, scripthash)
def _merkle_proof(self, cp_height, height):
async def _merkle_proof(self, cp_height, height):
max_height = self.db_height()
if not height <= cp_height <= max_height:
raise RPCError(BAD_REQUEST,
f'require header height {height:,d} <= '
f'cp_height {cp_height:,d} <= '
f'chain height {max_height:,d}')
branch, root = self.chain_state.header_branch_and_root(
branch, root = await self.chain_state.header_branch_and_root(
cp_height + 1, height)
return {
'branch': [hash_to_hex_str(elt) for elt in branch],
@ -918,11 +927,11 @@ class ElectrumX(SessionBase):
dictionary with a merkle proof.'''
height = non_negative_integer(height)
cp_height = non_negative_integer(cp_height)
raw_header_hex = self.raw_header(height).hex()
raw_header_hex = (await self.raw_header(height)).hex()
if cp_height == 0:
return raw_header_hex
result = {'header': raw_header_hex}
result.update(self._merkle_proof(cp_height, height))
result.update(await self._merkle_proof(cp_height, height))
return result
async def block_header_13(self, height):
@ -944,11 +953,12 @@ class ElectrumX(SessionBase):
max_size = self.MAX_CHUNK_SIZE
count = min(count, max_size)
headers, count = self.chain_state.read_headers(start_height, count)
headers, count = await self.chain_state.read_headers(start_height,
count)
result = {'hex': headers.hex(), 'count': count, 'max': max_size}
if count and cp_height:
last_height = start_height + count - 1
result.update(self._merkle_proof(cp_height, last_height))
result.update(await self._merkle_proof(cp_height, last_height))
return result
async def block_headers_12(self, start_height, count):
@ -961,7 +971,7 @@ class ElectrumX(SessionBase):
index = non_negative_integer(index)
size = self.coin.CHUNK_SIZE
start_height = index * size
headers, count = self.chain_state.read_headers(start_height, size)
headers, _ = await self.chain_state.read_headers(start_height, size)
return headers.hex()
async def block_get_header(self, height):
@ -969,7 +979,7 @@ class ElectrumX(SessionBase):
height: the header's height'''
height = non_negative_integer(height)
return self.electrum_header(height)
return await self.electrum_header(height)
def is_tor(self):
'''Try to detect if the connection is to a tor hidden service we are

View File

@ -271,6 +271,7 @@ def _exponent_to_bytes(exponent):
'''Convert an exponent to 32 big-endian bytes'''
return (bytes(32) + int_to_bytes(exponent))[-32:]
def _from_extended_key(ekey):
'''Return a PubKey or PrivKey from an extended key raw bytes.'''
if not isinstance(ekey, (bytes, bytearray)):
@ -295,6 +296,7 @@ def _from_extended_key(ekey):
return key, coin
def from_extended_key_string(ekey_str):
'''Given an extended key string, such as

View File

@ -149,72 +149,83 @@ class Source(object):
def __init__(self, length):
self._hashes = [os.urandom(32) for _ in range(length)]
def hashes(self, start, count):
async def hashes(self, start, count):
assert start >= 0
assert start + count <= len(self._hashes)
return self._hashes[start: start + count]
def test_merkle_cache():
@pytest.mark.asyncio
async def test_merkle_cache():
lengths = (*range(1, 18), 31, 32, 33, 57)
source = Source(max(lengths))
source = Source(max(lengths)).hashes
for length in lengths:
cache = MerkleCache(merkle, source, length)
cache = MerkleCache(merkle, source)
await cache.initialize(length)
# Simulate all possible checkpoints
for cp_length in range(1, length + 1):
cp_hashes = source.hashes(0, cp_length)
cp_hashes = await source(0, cp_length)
# All possible indices
for index in range(cp_length):
# Compare correct answer with cache
branch, root = merkle.branch_and_root(cp_hashes, index)
branch2, root2 = cache.branch_and_root(cp_length, index)
branch2, root2 = await cache.branch_and_root(cp_length, index)
assert branch == branch2
assert root == root2
def test_merkle_cache_extension():
source = Source(64)
@pytest.mark.asyncio
async def test_merkle_cache_extension():
source = Source(64).hashes
for length in range(14, 18):
for cp_length in range(30, 36):
cache = MerkleCache(merkle, source, length)
cp_hashes = source.hashes(0, cp_length)
cache = MerkleCache(merkle, source)
await cache.initialize(length)
cp_hashes = await source(0, cp_length)
# All possible indices
for index in range(cp_length):
# Compare correct answer with cache
branch, root = merkle.branch_and_root(cp_hashes, index)
branch2, root2 = cache.branch_and_root(cp_length, index)
branch2, root2 = await cache.branch_and_root(cp_length, index)
assert branch == branch2
assert root == root2
def test_merkle_cache_truncation():
@pytest.mark.asyncio
async def test_merkle_cache_truncation():
max_length = 33
source = Source(max_length)
source = Source(max_length).hashes
for length in range(max_length - 2, max_length + 1):
for trunc_length in range(1, 20, 3):
cache = MerkleCache(merkle, source, length)
cache = MerkleCache(merkle, source)
await cache.initialize(length)
cache.truncate(trunc_length)
assert cache.length <= trunc_length
for cp_length in range(1, length + 1, 3):
cp_hashes = source.hashes(0, cp_length)
cp_hashes = await source(0, cp_length)
# All possible indices
for index in range(cp_length):
# Compare correct answer with cache
branch, root = merkle.branch_and_root(cp_hashes, index)
branch2, root2 = cache.branch_and_root(cp_length, index)
branch2, root2 = await cache.branch_and_root(cp_length,
index)
assert branch == branch2
assert root == root2
# Truncation is a no-op if longer
cache = MerkleCache(merkle, source, 10)
cache = MerkleCache(merkle, source)
await cache.initialize(10)
level = cache.level.copy()
for length in range(10, 13):
cache.truncate(length)
assert cache.level == level
assert cache.length == 10
def test_truncation_bad():
cache = MerkleCache(merkle, Source(10), 10)
@pytest.mark.asyncio
async def test_truncation_bad():
cache = MerkleCache(merkle, Source(10).hashes)
await cache.initialize(10)
with pytest.raises(TypeError):
cache.truncate(1.0)
for n in (-1, 0):
@ -222,43 +233,48 @@ def test_truncation_bad():
cache.truncate(n)
def test_markle_cache_bad():
@pytest.mark.asyncio
async def test_markle_cache_bad():
length = 23
source = Source(length)
cache = MerkleCache(merkle, source, length)
cache.branch_and_root(5, 3)
source = Source(length).hashes
cache = MerkleCache(merkle, source)
await cache.initialize(length)
await cache.branch_and_root(5, 3)
with pytest.raises(TypeError):
cache.branch_and_root(5.0, 3)
await cache.branch_and_root(5.0, 3)
with pytest.raises(TypeError):
cache.branch_and_root(5, 3.0)
await cache.branch_and_root(5, 3.0)
with pytest.raises(ValueError):
cache.branch_and_root(0, -1)
await cache.branch_and_root(0, -1)
with pytest.raises(ValueError):
cache.branch_and_root(3, 3)
await cache.branch_and_root(3, 3)
def test_bad_extension():
@pytest.mark.asyncio
async def test_bad_extension():
length = 5
source = Source(length)
cache = MerkleCache(merkle, source, length)
source = Source(length).hashes
cache = MerkleCache(merkle, source)
await cache.initialize(length)
level = cache.level.copy()
with pytest.raises(AssertionError):
cache.branch_and_root(8, 0)
await cache.branch_and_root(8, 0)
# The bad extension should not destroy the cache
assert cache.level == level
assert cache.length == length
def time_it():
source = Source(500000)
async def time_it():
source = Source(500000).hashes
cp_length = 492000
import time
cache = MerkleCache(merkle, source)
cp_length = 492000
cp_hashes = source.hashes(0, cp_length)
await cache.initialize(cp_length)
cp_hashes = await source(0, cp_length)
brs2 = []
t1 = time.time()
for index in range(5, 400000, 500):
brs2.append(cache.branch_and_root(cp_length, index))
brs2.append(await cache.branch_and_root(cp_length, index))
t2 = time.time()
print(t2 - t1)
assert False

View File

@ -0,0 +1,502 @@
import logging
import os
from collections import defaultdict
from functools import partial
from random import randrange, choice
import pytest
from aiorpcx import Event, TaskGroup, sleep, spawn, ignore_after
from electrumx.server.mempool import MemPool, MemPoolAPI
from electrumx.lib.coins import BitcoinCash
from electrumx.lib.hash import HASHX_LEN, hex_str_to_hash, hash_to_hex_str
from electrumx.lib.tx import Tx, TxInput, TxOutput
from electrumx.lib.util import make_logger
coin = BitcoinCash
tx_hash_fn = coin.DESERIALIZER.TX_HASH_FN
def random_tx(hash160s, utxos):
'''Create a random TX paying to some of the hash160s using some of the
UTXOS. Return the TX. UTXOs is updated for the effects of the TX.
'''
inputs = []
n_inputs = min(randrange(1, 4), len(utxos))
input_value = 0
# Create inputs spending random UTXOs. total the inpu
for n in range(n_inputs):
prevout = choice(list(utxos))
hashX, value = utxos.pop(prevout)
inputs.append(TxInput(prevout[0], prevout[1], b'', 4294967295))
input_value += value
fee = min(input_value, randrange(500))
input_value -= fee
outputs = []
n_outputs = randrange(1, 4)
for n in range(n_outputs):
value = randrange(input_value)
input_value -= value
pk_script = coin.hash160_to_P2PKH_script(choice(hash160s))
outputs.append(TxOutput(value, pk_script))
tx = Tx(2, inputs, outputs, 0)
tx_bytes = tx.serialize()
tx_hash = tx_hash_fn(tx_bytes)
for n, output in enumerate(tx.outputs):
utxos[(tx_hash, n)] = (coin.hashX_from_script(output.pk_script),
output.value)
return tx, tx_hash, tx_bytes
class API(MemPoolAPI):
def __init__(self):
self._height = 0
self._cached_height = self._height
# Create a pool of hash160s. Map them to their script hashes
# Create a bunch of UTXOs paying to those script hashes
# Create a bunch of TXs that spend from the UTXO set and create
# new outpus, which are added to the UTXO set for later TXs to
# spend
self.db_utxos = {}
self.on_mempool_calls = []
self.hashXs = []
# Maps of mempool txs from tx_hash to raw and Tx object forms
self.raw_txs = {}
self.txs = {}
self.ordered_adds = []
def initialize(self, addr_count=100, db_utxo_count=100, mempool_size=50):
hash160s = [os.urandom(20) for n in range(addr_count)]
self.hashXs = [coin.hash160_to_P2PKH_hashX(hash160)
for hash160 in hash160s]
prevouts = [(os.urandom(32), randrange(0, 10))
for n in range (db_utxo_count)]
random_value = partial(randrange, coin.VALUE_PER_COIN * 10)
self.db_utxos = {prevout: (choice(self.hashXs), random_value())
for prevout in prevouts}
unspent_utxos = self.db_utxos.copy()
for n in range(mempool_size):
tx, tx_hash, raw_tx = random_tx(hash160s, unspent_utxos)
self.raw_txs[tx_hash] = raw_tx
self.txs[tx_hash] = tx
self.ordered_adds.append(tx_hash)
def mempool_utxos(self):
utxos = {}
for tx_hash, tx in self.txs.items():
for n, output in enumerate(tx.outputs):
hashX = coin.hashX_from_script(output.pk_script)
utxos[(tx_hash, n)] = (hashX, output.value)
return utxos
def mempool_spends(self):
return [(input.prev_hash, input.prev_idx)
for tx in self.txs.values() for input in tx.inputs]
def balance_deltas(self):
# Return mempool balance deltas indexed by hashX
deltas = defaultdict(int)
utxos = self.mempool_utxos()
for tx_hash, tx in self.txs.items():
for n, input in enumerate(tx.inputs):
prevout = (input.prev_hash, input.prev_idx)
if prevout in utxos:
utxos.pop(prevout)
else:
hashX, value = self.db_utxos[prevout]
deltas[hashX] -= value
for hashX, value in utxos.values():
deltas[hashX] += value
return deltas
def spends(self):
# Return spends indexed by hashX
spends = defaultdict(list)
utxos = self.mempool_utxos()
for tx_hash, tx in self.txs.items():
for n, input in enumerate(tx.inputs):
prevout = (input.prev_hash, input.prev_idx)
if prevout in utxos:
hashX, value = utxos.pop(prevout)
else:
hashX, value = self.db_utxos[prevout]
spends[hashX].append(prevout)
return spends
def summaries(self):
# Return lists of (tx_hash, fee, has_unconfirmed_inputs) by hashX
summaries = defaultdict(list)
utxos = self.mempool_utxos()
for tx_hash, tx in self.txs.items():
fee = 0
hashXs = set()
has_ui = False
for n, input in enumerate(tx.inputs):
has_ui = has_ui or (input.prev_hash in self.txs)
prevout = (input.prev_hash, input.prev_idx)
if prevout in utxos:
hashX, value = utxos[prevout]
else:
hashX, value = self.db_utxos[prevout]
hashXs.add(hashX)
fee += value
for output in tx.outputs:
hashXs.add(coin.hashX_from_script(output.pk_script))
fee -= output.value
summary = (tx_hash, fee, has_ui)
for hashX in hashXs:
summaries[hashX].append(summary)
return summaries
def touched(self, tx_hashes):
touched = set()
utxos = self.mempool_utxos()
for tx_hash in tx_hashes:
tx = self.txs[tx_hash]
for n, input in enumerate(tx.inputs):
prevout = (input.prev_hash, input.prev_idx)
if prevout in utxos:
hashX, value = utxos[prevout]
else:
hashX, value = self.db_utxos[prevout]
touched.add(hashX)
for output in tx.outputs:
touched.add(coin.hashX_from_script(output.pk_script))
return touched
def UTXOs(self):
# Return lists of UTXO 5-tuples by hashX
utxos = defaultdict(list)
for tx_hash, tx in self.txs.items():
for n, output in enumerate(tx.outputs):
hashX = coin.hashX_from_script(output.pk_script)
utxos[hashX].append((-1, n, tx_hash, 0, output.value))
return utxos
async def height(self):
await sleep(0)
self._cached_height = self._height
return self._height
def cached_height(self):
return self._cached_height
async def mempool_hashes(self):
'''Query bitcoind for the hashes of all transactions in its
mempool, returned as a list.'''
await sleep(0)
return [hash_to_hex_str(hash) for hash in self.txs]
async def raw_transactions(self, hex_hashes):
'''Query bitcoind for the serialized raw transactions with the given
hashes. Missing transactions are returned as None.
hex_hashes is an iterable of hexadecimal hash strings.'''
await sleep(0)
hashes = [hex_str_to_hash(hex_hash) for hex_hash in hex_hashes]
return [self.raw_txs.get(hash) for hash in hashes]
async def lookup_utxos(self, prevouts):
'''Return a list of (hashX, value) pairs each prevout if unspent,
otherwise return None if spent or not found.
prevouts - an iterable of (hash, index) pairs
'''
await sleep(0)
return [self.db_utxos.get(prevout) for prevout in prevouts]
async def on_mempool(self, touched, height):
'''Called each time the mempool is synchronized. touched is a set of
hashXs touched since the previous call. height is the
daemon's height at the time the mempool was obtained.'''
self.on_mempool_calls.append((touched, height))
await sleep(0)
class DropAPI(API):
def __init__(self, drop_count):
super().__init__()
self.drop_count = drop_count
self.dropped = False
async def raw_transactions(self, hex_hashes):
if not self.dropped:
self.dropped = True
for hash in self.ordered_adds[-self.drop_count:]:
del self.raw_txs[hash]
del self.txs[hash]
return await super().raw_transactions(hex_hashes)
def in_caplog(caplog, message):
return any(message in record.message for record in caplog.records)
@pytest.mark.asyncio
async def test_keep_synchronized(caplog):
api = API()
mempool = MemPool(coin, api)
event = Event()
with caplog.at_level(logging.INFO):
async with TaskGroup() as group:
await group.spawn(mempool.keep_synchronized, event)
await event.wait()
await group.cancel_remaining()
assert in_caplog(caplog, 'beginning processing of daemon mempool')
assert in_caplog(caplog, 'compact fee histogram')
assert in_caplog(caplog, 'synced in ')
assert in_caplog(caplog, '0 txs touching 0 addresses')
assert not in_caplog(caplog, 'txs dropped')
@pytest.mark.asyncio
async def test_balance_delta():
api = API()
api.initialize()
mempool = MemPool(coin, api)
event = Event()
async with TaskGroup() as group:
await group.spawn(mempool.keep_synchronized, event)
await event.wait()
await group.cancel_remaining()
# Check the default dict is handled properly
prior_len = len(mempool.hashXs)
assert await mempool.balance_delta(os.urandom(HASHX_LEN)) == 0
assert prior_len == len(mempool.hashXs)
# Test all hashXs
deltas = api.balance_deltas()
for hashX in api.hashXs:
expected = deltas.get(hashX, 0)
assert await mempool.balance_delta(hashX) == expected
@pytest.mark.asyncio
async def test_compact_fee_histogram():
api = API()
api.initialize()
mempool = MemPool(coin, api)
event = Event()
async with TaskGroup() as group:
await group.spawn(mempool.keep_synchronized, event)
await event.wait()
await group.cancel_remaining()
histogram = await mempool.compact_fee_histogram()
assert histogram == []
bin_size = 1000
mempool._update_histogram(bin_size)
histogram = await mempool.compact_fee_histogram()
assert len(histogram) > 0
rates, sizes = zip(*histogram)
assert all(rates[n] < rates[n - 1] for n in range(1, len(rates)))
assert all(size > bin_size * 0.95 for size in sizes)
@pytest.mark.asyncio
async def test_potential_spends():
api = API()
api.initialize()
mempool = MemPool(coin, api)
event = Event()
async with TaskGroup() as group:
await group.spawn(mempool.keep_synchronized, event)
await event.wait()
await group.cancel_remaining()
# Check the default dict is handled properly
prior_len = len(mempool.hashXs)
assert await mempool.potential_spends(os.urandom(HASHX_LEN)) == set()
assert prior_len == len(mempool.hashXs)
# Test all hashXs
spends = api.spends()
for hashX in api.hashXs:
ps = await mempool.potential_spends(hashX)
assert all(spend in ps for spend in spends[hashX])
async def _test_summaries(mempool, api):
# Test all hashXs
summaries = api.summaries()
for hashX in api.hashXs:
mempool_result = await mempool.transaction_summaries(hashX)
mempool_result = [(item.hash, item.fee, item.has_unconfirmed_inputs)
for item in mempool_result]
our_result = summaries.get(hashX, [])
assert set(our_result) == set(mempool_result)
@pytest.mark.asyncio
async def test_transaction_summaries(caplog):
api = API()
api.initialize()
mempool = MemPool(coin, api)
event = Event()
with caplog.at_level(logging.INFO):
async with TaskGroup() as group:
await group.spawn(mempool.keep_synchronized, event)
await event.wait()
await group.cancel_remaining()
# Check the default dict is handled properly
prior_len = len(mempool.hashXs)
assert await mempool.transaction_summaries(os.urandom(HASHX_LEN)) == []
assert prior_len == len(mempool.hashXs)
await _test_summaries(mempool, api)
assert not in_caplog(caplog, 'txs dropped')
@pytest.mark.asyncio
async def test_unordered_UTXOs():
api = API()
api.initialize()
mempool = MemPool(coin, api)
event = Event()
async with TaskGroup() as group:
await group.spawn(mempool.keep_synchronized, event)
await event.wait()
await group.cancel_remaining()
# Check the default dict is handled properly
prior_len = len(mempool.hashXs)
assert await mempool.unordered_UTXOs(os.urandom(HASHX_LEN)) == []
assert prior_len == len(mempool.hashXs)
# Test all hashXs
utxos = api.UTXOs()
for hashX in api.hashXs:
mempool_result = await mempool.unordered_UTXOs(hashX)
our_result = utxos.get(hashX, [])
assert set(our_result) == set(mempool_result)
@pytest.mark.asyncio
async def test_mempool_removals():
api = API()
api.initialize()
mempool = MemPool(coin, api, refresh_secs=0.01)
event = Event()
async with TaskGroup() as group:
await group.spawn(mempool.keep_synchronized, event)
await event.wait()
# Remove half the TXs from the mempool
start = len(api.ordered_adds) // 2
for tx_hash in api.ordered_adds[start:]:
del api.txs[tx_hash]
del api.raw_txs[tx_hash]
await event.wait()
await _test_summaries(mempool, api)
# Removed hashXs should have key destroyed
assert all(mempool.hashXs.values())
# Remove the rest
api.txs.clear()
api.raw_txs.clear()
await event.wait()
await _test_summaries(mempool, api)
assert not mempool.hashXs
assert not mempool.txs
await group.cancel_remaining()
@pytest.mark.asyncio
async def test_daemon_drops_txs():
# Tests things work if the daemon drops some transactions between
# returning their hashes and the mempool requesting the raw txs
api = DropAPI(10)
api.initialize()
mempool = MemPool(coin, api, refresh_secs=0.01)
event = Event()
async with TaskGroup() as group:
await group.spawn(mempool.keep_synchronized, event)
await event.wait()
await _test_summaries(mempool, api)
await group.cancel_remaining()
@pytest.mark.asyncio
async def test_notifications():
# Tests notifications over a cycle of:
# 1) A first batch of txs come in
# 2) A second batch of txs come in
# 3) A block comes in confirming the first batch only
api = API()
api.initialize()
mempool = MemPool(coin, api, refresh_secs=0.001, log_status_secs=0)
event = Event()
n = len(api.ordered_adds) // 2
raw_txs = api.raw_txs.copy()
txs = api.txs.copy()
first_hashes = api.ordered_adds[:n]
first_touched = api.touched(first_hashes)
second_hashes = api.ordered_adds[n:]
second_touched = api.touched(second_hashes)
async with TaskGroup() as group:
# First batch enters the mempool
api.raw_txs = {hash: raw_txs[hash] for hash in first_hashes}
api.txs = {hash: txs[hash] for hash in first_hashes}
first_utxos = api.mempool_utxos()
first_spends = api.mempool_spends()
await group.spawn(mempool.keep_synchronized, event)
await event.wait()
assert len(api.on_mempool_calls) == 1
touched, height = api.on_mempool_calls[0]
assert height == api._height == api._cached_height
assert touched == first_touched
# Second batch enters the mempool
api.raw_txs = raw_txs
api.txs = txs
await event.wait()
assert len(api.on_mempool_calls) == 2
touched, height = api.on_mempool_calls[1]
assert height == api._height == api._cached_height
# Touched is incremental
assert touched == second_touched
# Block found; first half confirm
new_height = 2
api._height = new_height
api.db_utxos.update(first_utxos)
for spend in first_spends:
del api.db_utxos[spend]
api.raw_txs = {hash: raw_txs[hash] for hash in second_hashes}
api.txs = {hash: txs[hash] for hash in second_hashes}
await event.wait()
assert len(api.on_mempool_calls) == 3
touched, height = api.on_mempool_calls[2]
assert height == api._height == api._cached_height == new_height
assert touched == first_touched
await group.cancel_remaining()
@pytest.mark.asyncio
async def test_dropped_txs(caplog):
api = API()
api.initialize()
mempool = MemPool(coin, api)
event = Event()
# Remove a single TX_HASH that is used in another mempool tx
for prev_hash, prev_idx in api.mempool_spends():
if prev_hash in api.txs:
del api.txs[prev_hash]
with caplog.at_level(logging.INFO):
async with TaskGroup() as group:
await group.spawn(mempool.keep_synchronized, event)
await event.wait()
await group.cancel_remaining()
assert in_caplog(caplog, 'txs dropped')