Merge branch 'release-0.6'

This commit is contained in:
Neil Booth 2016-11-17 23:10:20 +09:00
commit 8e206ca099
12 changed files with 270 additions and 224 deletions

View File

@ -50,52 +50,75 @@ testnets, of course.
Implementation Implementation
============== ==============
ElectrumX does not currently do any pruning. With luck it may never ElectrumX does not do any pruning or throwing away of history. It
become necessary. So how does it achieve a much more compact database will retain this property for as long as feasible, and I believe it is
than Electrum server, which prunes a lot of hisory, and also sync efficiently achievable for the forseeable future with plain Python.
faster?
All of the following likely play a part: So how does it achieve a much more compact database than Electrum
server, which is forced to prune hisory for busy addresses, and yet
sync roughly 2 orders of magnitude faster?
I believe all of the following play a part:
- aggressive caching and batching of DB writes - aggressive caching and batching of DB writes
- more compact representation of UTXOs, the address index, and - more compact and efficient representation of UTXOs, address index,
history. Electrum server stores full transaction hash and height and history. Electrum-Server stores full transaction hash and
for all UTXOs. In its pruned history it does the same. ElectrumX height for each UTXO, and does the same in its pruned history. In
just stores the transaction number in the linear history of contrast ElectrumX just stores the transaction number in the linear
transactions. For at least another 5 years the transaction number history of transactions. For at least another 5 years this
will fit in a 4-byte integer. ElectrumX calculates the height from transaction number will fit in a 4-byte integer, and when necessary
a simple lookup in a linear array which is stored on disk. expanding to 5 or 6 bytes is trivial. ElectrumX can determine block
ElectrumX also stores transaction hashes in a linear array on disk. height from a simple binary search of tx counts stored on disk.
- storing static append-only metadata which is indexed by position on ElectrumX stores historical transaction hashes in a linear array on
disk rather than in levelDB. It would be nice to do this for histories disk.
but I cannot think how they could be easily indexable on a filesystem. - placing static append-only metadata indexable by position on disk
- avoiding unnecessary or redundant computations rather than in levelDB. It would be nice to do this for histories
- more efficient memory usage but I cannot think of a way.
- asyncio and asynchronous prefetch of blocks. - avoiding unnecessary or redundant computations, such as converting
address hashes to human-readable ASCII strings with expensive bignum
arithmetic, and then back again.
- better choice of Python data structures giving lower memory usage as
well as faster traversal
- leveraging asyncio for asynchronous prefetch of blocks to mostly
eliminate CPU idling. As a Python program ElectrumX is unavoidably
single-threaded in its essence; we must keep that CPU core busy.
ElectrumX should not have any need of threads. Python's asyncio means ElectrumX has no (direct) use for threads and
associated complications. I cannot foresee any case where they might
be necessary.
Roadmap Roadmap Pre-1.0
======= ===============
- come up with UTXO root logic and implement it - minor code cleanups
- test a few more performance improvement ideas - minor additions of missing functionality
- implement light caching of client responses - logging improvements, mostly post-sync. Pre-sync logs seem decent.
- yield during expensive requests and/or penalize the connection - at most 1 more DB format change; I will make a weak attempt to
retain 0.6 release's DB format if possible
- provision of configurable ways to limit client connections so as to
mitigate intentional or unintentional degradation of server response
time to other clients. Based on IRC discussion this will likely be a
combination of address subscription and bandwidth limits.
Roadmap Post-1.0
================
- UTXO root logic and implementation
- improve DB abstraction so LMDB is not penalized - improve DB abstraction so LMDB is not penalized
- investigate effects of cache defaults and DB configuration defaults
on sync time and simplify / optimize the default config accordingly
- potentially move some functionality to C or C++ - potentially move some functionality to C or C++
The above are in no particular order.
Database Format Database Format
=============== ===============
The database and metadata formats of ElectrumX are certain to change The database and metadata formats of ElectrumX are likely to change.
in the future. Such a change will render old DBs unusable. For now I Such changes will render old DBs unusable. At least until 1.0 I do
do not intend to provide converters as this is still non-production not intend to provide converters; moreover from-genesis sync time to
software. Moreover from-genesis sync time is quite bearable. create a pristine database is quite tolerable.
Miscellany Miscellany

View File

@ -54,7 +54,7 @@ you set the sum of these to nothing over half your available physical
RAM: RAM:
HIST_MB - amount of history cache, in MB, to retain before flushing to HIST_MB - amount of history cache, in MB, to retain before flushing to
disk. Default is 250; probably no benefit being much larger disk. Default is 300; probably no benefit being much larger
as history is append-only and not searched. as history is append-only and not searched.
UTXO_MB - amount of UTXO and history cache, in MB, to retain before UTXO_MB - amount of UTXO and history cache, in MB, to retain before

View File

@ -102,17 +102,17 @@ Then copy the all sample scripts from the ElectrumX source tree there::
cp -R /path/to/repo/electrumx/samples/scripts ~/scripts/electrumx cp -R /path/to/repo/electrumx/samples/scripts ~/scripts/electrumx
This copies 4 things: the top level server run script, a log/ directory This copies 3 things: the top level server run script, a log/ directory
with the logger run script, an env/ directory, and a NOTES file. with the logger run script, an env/ directory.
You need to configure the environment variables under env/ to your You need to configure the environment variables under env/ to your
setup, as explained in NOTES. ElectrumX server currently takes no setup, as explained in docs/ENV-NOTES. ElectrumX server currently
command line arguments; all of its configuration is taken from its takes no command line arguments; all of its configuration is taken
environment which is set up according to env/ directory (see 'envdir' from its environment which is set up according to env/ directory (see
man page). Finally you need to change the log/run script to use the 'envdir' man page). Finally you need to change the log/run script to
directory where you want the logs to be written by multilog. The use the directory where you want the logs to be written by multilog.
directory need not exist as multilog will create it, but its parent The directory need not exist as multilog will create it, but its
directory must exist. parent directory must exist.
Now start the 'svscan' process. This will not do much as the service Now start the 'svscan' process. This will not do much as the service
directory is still empty:: directory is still empty::
@ -143,7 +143,7 @@ The sample unit file assumes that the repository is located at
change the unit file accordingly. change the unit file accordingly.
You need to set a few configuration variables in :code:`/etc/electrumx.conf`, You need to set a few configuration variables in :code:`/etc/electrumx.conf`,
see `samples/NOTES` for the list of required variables. see `docs/ENV-NOTES` for the list of required variables.
Now you can start ElectrumX using :code:`systemctl`:: Now you can start ElectrumX using :code:`systemctl`::
@ -172,7 +172,7 @@ machine doing the indexing is focussing on the one task and not the
wider network. wider network.
The HIST_MB and CACHE_MB environment variables control cache sizes The HIST_MB and CACHE_MB environment variables control cache sizes
before they spill to disk; see the NOTES file under samples/scripts. before they spill to disk; see the ENV-NOTES file under docs/.
Here is my experience with the current codebase, to given heights and Here is my experience with the current codebase, to given heights and
rough wall-time:: rough wall-time::

View File

@ -1,3 +1,24 @@
version 0.6.0
-------------
- DB format has changed again. This doesn't give a performance gain
or reduction that I could measure, but is cleaner in that each table
entry is now a singleton and not an array, which I much prefer as a
cleaner solution. It may enable other goodness in the future.
- Logging is much less noisy when serving clients. In fact anything
in your logs that isn't just status updates probably is a bug that I
would like to know about. Unfortunately clean shutdown whilst
serving clients leads to massive log spew. This is harmless and I
believe because of my noob status with asyncio. I intend to fix
this in a nearby release.
- expensive client requests are intended to yield to other requests
sufficiently frequently that there should be no noticeable delays or
pauses under normal load from hog clients.
- Notifications to hog clients are now queued in sequence with their
request responses. They used to be sent immediately regardless of
pending requests which seems less than ideal.
- some trivial improvements and fixes to local RPC query output
version 0.5.1 version 0.5.1
------------- -------------

View File

@ -43,7 +43,7 @@ class RPCClient(asyncio.Protocol):
return ('{:3d}:{:02d}:{:02d}' return ('{:3d}:{:02d}:{:02d}'
.format(t // 3600, (t % 3600) // 60, t % 60)) .format(t // 3600, (t % 3600) // 60, t % 60))
fmt = ('{:<4} {:>23} {:>15} {:>5} ' fmt = ('{:<4} {:>23} {:>15} {:>7} '
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
print(fmt.format('Type', 'Peer', 'Client', 'Subs', print(fmt.format('Type', 'Peer', 'Client', 'Subs',
'Recv #', 'Recv KB', 'Sent #', 'Sent KB', 'Recv #', 'Recv KB', 'Sent #', 'Sent KB',

View File

@ -19,6 +19,9 @@ from functools import partial
from server.env import Env from server.env import Env
from server.protocol import BlockServer from server.protocol import BlockServer
SUPPRESS_MESSAGES = [
'Fatal read error on socket transport',
]
def main_loop(): def main_loop():
'''Start the server.''' '''Start the server.'''
@ -34,6 +37,14 @@ def main_loop():
logging.warning('received {} signal, shutting down'.format(signame)) logging.warning('received {} signal, shutting down'.format(signame))
future.cancel() future.cancel()
def on_exception(loop, context):
'''Suppress spurious messages it appears we cannot control.'''
message = context.get('message')
if not message in SUPPRESS_MESSAGES:
if not ('task' in context and
'accept_connection2()' in repr(context.get('task'))):
loop.default_exception_handler(context)
server = BlockServer(Env()) server = BlockServer(Env())
future = asyncio.ensure_future(server.main_loop()) future = asyncio.ensure_future(server.main_loop())
@ -42,6 +53,8 @@ def main_loop():
loop.add_signal_handler(getattr(signal, signame), loop.add_signal_handler(getattr(signal, signame),
partial(on_signal, signame)) partial(on_signal, signame))
# Install exception handler
loop.set_exception_handler(on_exception)
loop.run_until_complete(future) loop.run_until_complete(future)
loop.close() loop.close()

View File

@ -1 +1 @@
250 300

View File

@ -353,9 +353,8 @@ class BlockProcessor(server.db.DB):
# UTXO cache # UTXO cache
self.utxo_cache = {} self.utxo_cache = {}
self.db_cache = {}
self.utxo_cache_spends = 0 self.utxo_cache_spends = 0
self.db_deletes = 0 self.db_deletes = []
# Log state # Log state
self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
@ -540,7 +539,7 @@ class BlockProcessor(server.db.DB):
assert self.height == self.fs_height == self.db_height assert self.height == self.fs_height == self.db_height
assert not self.history assert not self.history
assert not self.utxo_cache assert not self.utxo_cache
assert not self.db_cache assert not self.db_deletes
def flush(self, flush_utxos=False, flush_history=None): def flush(self, flush_utxos=False, flush_history=None):
'''Flush out cached state. '''Flush out cached state.
@ -708,15 +707,16 @@ class BlockProcessor(server.db.DB):
# more, so we scale our already bloated object sizes. # more, so we scale our already bloated object sizes.
one_MB = int(1048576 / 1.3) one_MB = int(1048576 / 1.3)
utxo_cache_size = len(self.utxo_cache) * 187 utxo_cache_size = len(self.utxo_cache) * 187
db_cache_size = len(self.db_cache) * 105 db_deletes_size = len(self.db_deletes) * 61
hist_cache_size = len(self.history) * 180 + self.history_size * 4 hist_cache_size = len(self.history) * 180 + self.history_size * 4
tx_hash_size = (self.tx_count - self.fs_tx_count) * 74 tx_hash_size = (self.tx_count - self.fs_tx_count) * 74
utxo_MB = (db_cache_size + utxo_cache_size) // one_MB utxo_MB = (db_deletes_size + utxo_cache_size) // one_MB
hist_MB = (hist_cache_size + tx_hash_size) // one_MB hist_MB = (hist_cache_size + tx_hash_size) // one_MB
self.logger.info('UTXOs: {:,d} deletes: {:,d} ' self.logger.info('UTXOs: {:,d} deletes: {:,d} '
'UTXOs {:,d}MB hist {:,d}MB' 'UTXOs {:,d}MB hist {:,d}MB'
.format(len(self.utxo_cache), self.db_deletes, .format(len(self.utxo_cache),
len(self.db_deletes) // 2,
utxo_MB, hist_MB)) utxo_MB, hist_MB))
self.logger.info('our height: {:,d} daemon height: {:,d}' self.logger.info('our height: {:,d} daemon height: {:,d}'
.format(self.height, self.daemon.cached_height())) .format(self.height, self.daemon.cached_height()))
@ -915,17 +915,18 @@ class BlockProcessor(server.db.DB):
To this end we maintain two "tables", one for each point above: To this end we maintain two "tables", one for each point above:
1. Key: b'u' + address_hash168 + tx_num + tx_idx 1. Key: b'u' + address_hash168 + tx_idx + tx_num
Value: the UTXO value as a 64-bit unsigned integer Value: the UTXO value as a 64-bit unsigned integer
2. Key: b'h' + compressed_tx_hash + tx_idx 2. Key: b'h' + compressed_tx_hash + tx_idx + tx_num
Value: [address_hash168 + tx_num] Value: hash168
The compressed tx hash is just the first few bytes of the hash of The compressed tx hash is just the first few bytes of the hash of
the tx in which the UTXO was created. As this is not unique there the tx in which the UTXO was created. As this is not unique there
will are potential collisions when saving and looking up UTXOs; will be potential collisions so tx_num is also in the key. When
hence why the second table has a list as its value. The collision looking up a UTXO the prefix space of the compressed hash needs to
can be resolved with the tx_num. The collision rate is low (<0.1%). be searched and resolved if necessary with the tx_num. The
collision rate is low (<0.1%).
''' '''
def spend_utxo(self, tx_hash, tx_idx): def spend_utxo(self, tx_hash, tx_idx):
@ -942,55 +943,36 @@ class BlockProcessor(server.db.DB):
self.utxo_cache_spends += 1 self.utxo_cache_spends += 1
return cache_value return cache_value
# Spend it from the DB. Read the UTXO through the cache # Spend it from the DB.
# because compressed keys can collide.
# The 4 is the COMPRESSED_TX_HASH_LEN
db_key = b'h' + tx_hash[:4] + idx_packed
db_value = self.db_cache_get(db_key)
if db_value:
# FIXME: this matches what we did previously but until we store
# all UTXOs isn't safe
if len(db_value) == 25:
udb_key = b'u' + db_value + idx_packed
utxo_value_packed = self.db.get(udb_key)
if utxo_value_packed:
# Remove the UTXO from both tables
self.db_deletes += 1
self.db_cache[db_key] = None
self.db_cache[udb_key] = None
return db_value + utxo_value_packed
# Fall through to below loop for error
assert len(db_value) % 25 == 0 # Key: b'h' + compressed_tx_hash + tx_idx + tx_num
# Value: hash168
prefix = b'h' + tx_hash[:4] + idx_packed
candidates = {db_key: hash168 for db_key, hash168
in self.db.iterator(prefix=prefix)}
# Find which entry, if any, the TX_HASH matches. for hdb_key, hash168 in candidates.items():
for n in range(0, len(db_value), 25): tx_num_packed = hdb_key[-4:]
tx_num, = unpack('<I', db_value[n + 21:n + 25])
if len(candidates) > 1:
tx_num, = unpack('<I', tx_num_packed)
hash, height = self.get_tx_hash(tx_num) hash, height = self.get_tx_hash(tx_num)
if hash == tx_hash: if hash != tx_hash:
match = db_value[n:n+25] continue
udb_key = b'u' + match + idx_packed
utxo_value_packed = self.db.get(udb_key)
if utxo_value_packed:
# Remove the UTXO from both tables
self.db_deletes += 1
self.db_cache[db_key] = db_value[:n] + db_value[n+25:]
self.db_cache[udb_key] = None
return match + utxo_value_packed
raise self.DBError('UTXO {} / {:,d} not found in "u" table' # Key: b'u' + address_hash168 + tx_idx + tx_num
.format(hash_to_str(tx_hash), tx_idx)) # Value: the UTXO value as a 64-bit unsigned integer
udb_key = b'u' + hash168 + hdb_key[-6:]
utxo_value_packed = self.db.get(udb_key)
if utxo_value_packed:
# Remove both entries for this UTXO
self.db_deletes.append(hdb_key)
self.db_deletes.append(udb_key)
return hash168 + tx_num_packed + utxo_value_packed
raise ChainError('UTXO {} / {:,d} not found in "h" table' raise ChainError('UTXO {} / {:,d} not found in "h" table'
.format(hash_to_str(tx_hash), tx_idx)) .format(hash_to_str(tx_hash), tx_idx))
def db_cache_get(self, key):
'''Fetch a 'h' value from the DB through our write cache.'''
value = self.db_cache.get(key)
if value:
return value
return self.db.get(key)
def flush_utxos(self, batch): def flush_utxos(self, batch):
'''Flush the cached DB writes and UTXO set to the batch.''' '''Flush the cached DB writes and UTXO set to the batch.'''
# Care is needed because the writes generated by flushing the # Care is needed because the writes generated by flushing the
@ -1004,40 +986,24 @@ class BlockProcessor(server.db.DB):
'DB spends: {:,d}' 'DB spends: {:,d}'
.format(len(self.utxo_cache) + self.utxo_cache_spends, .format(len(self.utxo_cache) + self.utxo_cache_spends,
self.utxo_cache_spends, self.utxo_cache_spends,
self.db_deletes)) len(self.db_deletes) // 2))
collisions = 0 batch_delete = batch.delete
new_utxos = len(self.utxo_cache) for key in self.db_deletes:
batch_delete(key)
self.db_deletes = []
batch_put = batch.put
for cache_key, cache_value in self.utxo_cache.items(): for cache_key, cache_value in self.utxo_cache.items():
# Frist write to the hash168 lookup table # suffix = tx_num + tx_idx
# The 4 is the COMPRESSED_TX_HASH_LEN hash168 = cache_value[:21]
db_key = b'h' + cache_key[:4] + cache_key[-2:] suffix = cache_key[-2:] + cache_value[21:25]
prior_value = self.db_cache_get(db_key) batch_put(b'h' + cache_key[:4] + suffix, hash168)
if prior_value: # Should rarely happen batch_put(b'u' + hash168 + suffix, cache_value[25:])
collisions += 1
self.db_cache[db_key] = prior_value + cache_value[:25]
else:
self.db_cache[db_key] = cache_value[:25]
# Next write the UTXO table
db_key = b'u' + cache_value[:25] + cache_key[-2:]
self.db_cache[db_key] = cache_value[-8:]
# GC-ing this now can only help the levelDB write.
self.utxo_cache = {} self.utxo_cache = {}
self.db_deletes = []
# Now we can update to the batch. self.utxo_cache_spends = 0
for key, value in self.db_cache.items():
if value:
batch.put(key, value)
else: # b'' or None
batch.delete(key)
adds = new_utxos + self.utxo_cache_spends
self.db_cache = {}
self.utxo_cache_spends = self.db_deletes = 0
self.utxo_flush_count = self.flush_count self.utxo_flush_count = self.flush_count
self.db_tx_count = self.tx_count self.db_tx_count = self.tx_count
self.db_height = self.height self.db_height = self.height

View File

@ -29,7 +29,7 @@ class DB(LoggedClass):
it was shutdown uncleanly. it was shutdown uncleanly.
''' '''
VERSIONS = [2] VERSIONS = [3]
class MissingUTXOError(Exception): class MissingUTXOError(Exception):
'''Raised if a mempool tx input UTXO couldn't be found.''' '''Raised if a mempool tx input UTXO couldn't be found.'''
@ -198,21 +198,18 @@ class DB(LoggedClass):
''' '''
limit = self._resolve_limit(limit) limit = self._resolve_limit(limit)
s_unpack = unpack s_unpack = unpack
# Key: b'u' + address_hash168 + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
prefix = b'u' + hash168 prefix = b'u' + hash168
for db_key, db_value in self.db.iterator(prefix=prefix): for db_key, db_value in self.db.iterator(prefix=prefix):
if limit == 0: if limit == 0:
return return
limit -= 1 limit -= 1
tx_num, tx_pos = s_unpack('<IH', db_key[-6:]) tx_num, tx_pos = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value) value, = unpack('<Q', db_value)
tx_hash, height = self.fs_tx_hash(tx_num) tx_hash, height = self.fs_tx_hash(tx_num)
yield UTXO(tx_num, tx_pos, tx_hash, height, value) yield UTXO(tx_num, tx_pos, tx_hash, height, value)
def get_utxos_sorted(self, hash168):
'''Returns all the UTXOs for an address sorted by height and
position in the block.'''
return sorted(self.get_utxos(hash168, limit=None))
def get_utxo_hash168(self, tx_hash, index): def get_utxo_hash168(self, tx_hash, index):
'''Returns the hash168 for a UTXO. '''Returns the hash168 for a UTXO.
@ -228,19 +225,19 @@ class DB(LoggedClass):
'''Return (hash168, tx_num_packed) for the given TXO. '''Return (hash168, tx_num_packed) for the given TXO.
Both are None if not found.''' Both are None if not found.'''
# The 4 is the COMPRESSED_TX_HASH_LEN # Key: b'h' + compressed_tx_hash + tx_idx + tx_num
key = b'h' + tx_hash[:4] + idx_packed # Value: hash168
db_value = self.db.get(key) prefix = b'h' + tx_hash[:4] + idx_packed
if db_value:
assert len(db_value) % 25 == 0
# Find which entry, if any, the TX_HASH matches. # Find which entry, if any, the TX_HASH matches.
for n in range(0, len(db_value), 25): for db_key, hash168 in self.db.iterator(prefix=prefix):
tx_num_packed = db_value[n + 21: n + 25] assert len(hash168) == 21
tx_num, = unpack('<I', tx_num_packed)
hash, height = self.fs_tx_hash(tx_num) tx_num_packed = db_key[-4:]
if hash == tx_hash: tx_num, = unpack('<I', tx_num_packed)
return db_value[n:n+21], tx_num_packed hash, height = self.fs_tx_hash(tx_num)
if hash == tx_hash:
return hash168, tx_num_packed
return None, None return None, None
@ -254,10 +251,12 @@ class DB(LoggedClass):
hash168, tx_num_packed = self.db_hash168(tx_hash, idx_packed) hash168, tx_num_packed = self.db_hash168(tx_hash, idx_packed)
if not hash168: if not hash168:
# This can happen when the daemon is a block ahead of us # This can happen when the daemon is a block ahead of us
# and has mempool txs spending new txs in that block # and has mempool txs spending outputs from that new block
raise self.MissingUTXOError raise self.MissingUTXOError
key = b'u' + hash168 + tx_num_packed + idx_packed # Key: b'u' + address_hash168 + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
key = b'u' + hash168 + idx_packed + tx_num_packed
db_value = self.db.get(key) db_value = self.db.get(key)
if not db_value: if not db_value:
raise self.DBError('UTXO {} / {:,d} in one table only' raise self.DBError('UTXO {} / {:,d} in one table only'

View File

@ -27,7 +27,7 @@ class Env(LoggedClass):
self.coin = Coin.lookup_coin_class(coin_name, network) self.coin = Coin.lookup_coin_class(coin_name, network)
self.db_dir = self.required('DB_DIRECTORY') self.db_dir = self.required('DB_DIRECTORY')
self.utxo_MB = self.integer('UTXO_MB', 1000) self.utxo_MB = self.integer('UTXO_MB', 1000)
self.hist_MB = self.integer('HIST_MB', 250) self.hist_MB = self.integer('HIST_MB', 300)
self.host = self.default('HOST', 'localhost') self.host = self.default('HOST', 'localhost')
self.reorg_limit = self.integer('REORG_LIMIT', 200) self.reorg_limit = self.integer('REORG_LIMIT', 200)
self.daemon_url = self.build_daemon_url() self.daemon_url = self.build_daemon_url()

View File

@ -108,9 +108,11 @@ class ServerManager(LoggedClass):
def notify(self, height, touched): def notify(self, height, touched):
'''Notify sessions about height changes and touched addresses.''' '''Notify sessions about height changes and touched addresses.'''
sessions = [session for session in self.sessions cache = {}
if isinstance(session, ElectrumX)] for session in self.sessions:
ElectrumX.notify(sessions, height, touched) if isinstance(session, ElectrumX):
# Use a tuple to distinguish from JSON
session.jobs.put_nowait((height, touched, cache))
def stop(self): def stop(self):
'''Close listening servers.''' '''Close listening servers.'''
@ -196,7 +198,7 @@ class Session(JSONRPC):
self.coin = bp.coin self.coin = bp.coin
self.kind = kind self.kind = kind
self.hash168s = set() self.hash168s = set()
self.requests = asyncio.Queue() self.jobs = asyncio.Queue()
self.current_task = None self.current_task = None
self.client = 'unknown' self.client = 'unknown'
@ -222,26 +224,23 @@ class Session(JSONRPC):
def on_json_request(self, request): def on_json_request(self, request):
'''Queue the request for asynchronous handling.''' '''Queue the request for asynchronous handling.'''
self.requests.put_nowait(request) self.jobs.put_nowait(request)
async def serve_requests(self): async def serve_requests(self):
'''Asynchronously run through the task queue.''' '''Asynchronously run through the task queue.'''
while True: while True:
await asyncio.sleep(0) await asyncio.sleep(0)
request = await self.requests.get() job = await self.jobs.get()
try: try:
start = time.time() if isinstance(job, tuple): # Height / mempool notification
await self.handle_json_request(request) await self.notify(*job)
secs = time.time() - start else:
if secs > 1: await self.handle_json_request(job)
self.logger.warning('slow request for {} took {:.1f}s: {}'
.format(self.peername(), secs,
request))
except asyncio.CancelledError: except asyncio.CancelledError:
break break
except Exception: except Exception:
# Getting here should probably be considered a bug and fixed # Getting here should probably be considered a bug and fixed
self.logger.error('error handling request {}'.format(request)) self.logger.error('error handling request {}'.format(job))
traceback.print_exc() traceback.print_exc()
def peername(self, *, for_log=True): def peername(self, *, for_log=True):
@ -261,8 +260,8 @@ class Session(JSONRPC):
return param return param
except ValueError: except ValueError:
pass pass
raise RPCError('parameter should be a transaction hash: {}' raise self.RPCError('parameter should be a transaction hash: {}'
.format(param)) .format(param))
def hash168_from_param(self, param): def hash168_from_param(self, param):
if isinstance(param, str): if isinstance(param, str):
@ -270,7 +269,8 @@ class Session(JSONRPC):
return self.coin.address_to_hash168(param) return self.coin.address_to_hash168(param)
except: except:
pass pass
raise RPCError('parameter should be a valid address: {}'.format(param)) raise self.RPCError('parameter should be a valid address: {}'
.format(param))
def non_negative_integer_from_param(self, param): def non_negative_integer_from_param(self, param):
try: try:
@ -281,24 +281,24 @@ class Session(JSONRPC):
if param >= 0: if param >= 0:
return param return param
raise RPCError('param should be a non-negative integer: {}' raise self.RPCError('param should be a non-negative integer: {}'
.format(param)) .format(param))
def extract_hash168(self, params): def extract_hash168(self, params):
if len(params) == 1: if len(params) == 1:
return self.hash168_from_param(params[0]) return self.hash168_from_param(params[0])
raise RPCError('params should contain a single address: {}' raise self.RPCError('params should contain a single address: {}'
.format(params)) .format(params))
def extract_non_negative_integer(self, params): def extract_non_negative_integer(self, params):
if len(params) == 1: if len(params) == 1:
return self.non_negative_integer_from_param(params[0]) return self.non_negative_integer_from_param(params[0])
raise RPCError('params should contain a non-negative integer: {}' raise self.RPCError('params should contain a non-negative integer: {}'
.format(params)) .format(params))
def require_empty_params(self, params): def require_empty_params(self, params):
if params: if params:
raise RPCError('params should be empty: {}'.format(params)) raise self.RPCError('params should be empty: {}'.format(params))
class ElectrumX(Session): class ElectrumX(Session):
@ -324,36 +324,41 @@ class ElectrumX(Session):
for prefix, suffixes in rpcs for prefix, suffixes in rpcs
for suffix in suffixes.split()} for suffix in suffixes.split()}
@classmethod async def notify(self, height, touched, cache):
def notify(cls, sessions, height, touched): '''Notify the client about changes in height and touched addresses.
headers_payload = height_payload = None
for session in sessions: Cache is a shared cache for this update.
if height != session.notified_height: '''
session.notified_height = height if height != self.notified_height:
if session.subscribe_headers: self.notified_height = height
if headers_payload is None: if self.subscribe_headers:
headers_payload = json_notification_payload( key = 'headers_payload'
'blockchain.headers.subscribe', if key not in cache:
(session.electrum_header(height), ), cache[key] = json_notification_payload(
) 'blockchain.headers.subscribe',
session.send_json(headers_payload) (self.electrum_header(height), ),
)
self.send_json(cache[key])
if session.subscribe_height: if self.subscribe_height:
if height_payload is None:
height_payload = json_notification_payload(
'blockchain.numblocks.subscribe',
(height, ),
)
session.send_json(height_payload)
hash168_to_address = session.coin.hash168_to_address
for hash168 in session.hash168s.intersection(touched):
address = hash168_to_address(hash168)
status = session.address_status(hash168)
payload = json_notification_payload( payload = json_notification_payload(
'blockchain.address.subscribe', (address, status)) 'blockchain.numblocks.subscribe',
session.send_json(payload) (height, ),
)
self.send_json(payload)
hash168_to_address = self.coin.hash168_to_address
matches = self.hash168s.intersection(touched)
for hash168 in matches:
address = hash168_to_address(hash168)
status = await self.address_status(hash168)
payload = json_notification_payload(
'blockchain.address.subscribe', (address, status))
self.send_json(payload)
if matches:
self.logger.info('notified {} of {} addresses'
.format(self.peername(), len(matches)))
def height(self): def height(self):
'''Return the block processor's current height.''' '''Return the block processor's current height.'''
@ -366,15 +371,15 @@ class ElectrumX(Session):
def electrum_header(self, height): def electrum_header(self, height):
'''Return the binary header at the given height.''' '''Return the binary header at the given height.'''
if not 0 <= height <= self.height(): if not 0 <= height <= self.height():
raise RPCError('height {:,d} out of range'.format(height)) raise self.RPCError('height {:,d} out of range'.format(height))
header = self.bp.read_headers(height, 1) header = self.bp.read_headers(height, 1)
return self.coin.electrum_header(header, height) return self.coin.electrum_header(header, height)
def address_status(self, hash168): async def address_status(self, hash168):
'''Returns status as 32 bytes.''' '''Returns status as 32 bytes.'''
# Note history is ordered and mempool unordered in electrum-server # Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0 # For mempool, height is -1 if unconfirmed txins, otherwise 0
history = self.bp.get_history(hash168) history = await self.async_get_history(hash168)
mempool = self.bp.mempool_transactions(hash168) mempool = self.bp.mempool_transactions(hash168)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
@ -407,10 +412,10 @@ class ElectrumX(Session):
return {"block_height": height, "merkle": merkle_branch, "pos": pos} return {"block_height": height, "merkle": merkle_branch, "pos": pos}
def get_history(self, hash168): async def get_history(self, hash168):
# Note history is ordered and mempool unordered in electrum-server # Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0 # For mempool, height is -1 if unconfirmed txins, otherwise 0
history = self.bp.get_history(hash168, limit=None) history = await self.async_get_history(hash168)
mempool = self.bp.mempool_transactions(hash168) mempool = self.bp.mempool_transactions(hash168)
conf = tuple({'tx_hash': hash_to_str(tx_hash), 'height': height} conf = tuple({'tx_hash': hash_to_str(tx_hash), 'height': height}
@ -427,44 +432,61 @@ class ElectrumX(Session):
count = min(next_height - start_height, chunk_size) count = min(next_height - start_height, chunk_size)
return self.bp.read_headers(start_height, count).hex() return self.bp.read_headers(start_height, count).hex()
def get_balance(self, hash168): async def async_get_history(self, hash168):
confirmed = self.bp.get_balance(hash168) # Python 3.6: use async generators; update callers
history = []
for item in self.bp.get_history(hash168, limit=None):
history.append(item)
if len(history) % 100 == 0:
await asyncio.sleep(0)
return history
async def get_utxos(self, hash168):
# Python 3.6: use async generators; update callers
utxos = []
for utxo in self.bp.get_utxos(hash168, limit=None):
utxos.append(utxo)
if len(utxos) % 25 == 0:
await asyncio.sleep(0)
return utxos
async def get_balance(self, hash168):
utxos = await self.get_utxos(hash168)
confirmed = sum(utxo.value for utxo in utxos)
unconfirmed = self.bp.mempool_value(hash168) unconfirmed = self.bp.mempool_value(hash168)
return {'confirmed': confirmed, 'unconfirmed': unconfirmed} return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
def list_unspent(self, hash168): async def list_unspent(self, hash168):
utxos = self.bp.get_utxos_sorted(hash168) return [{'tx_hash': hash_to_str(utxo.tx_hash), 'tx_pos': utxo.tx_pos,
return tuple({'tx_hash': hash_to_str(utxo.tx_hash), 'height': utxo.height, 'value': utxo.value}
'tx_pos': utxo.tx_pos, 'height': utxo.height, for utxo in sorted(await self.get_utxos(hash168))]
'value': utxo.value}
for utxo in utxos)
# --- blockchain commands # --- blockchain commands
async def address_get_balance(self, params): async def address_get_balance(self, params):
hash168 = self.extract_hash168(params) hash168 = self.extract_hash168(params)
return self.get_balance(hash168) return await self.get_balance(hash168)
async def address_get_history(self, params): async def address_get_history(self, params):
hash168 = self.extract_hash168(params) hash168 = self.extract_hash168(params)
return self.get_history(hash168) return await self.get_history(hash168)
async def address_get_mempool(self, params): async def address_get_mempool(self, params):
hash168 = self.extract_hash168(params) hash168 = self.extract_hash168(params)
raise RPCError('get_mempool is not yet implemented') raise self.RPCError('get_mempool is not yet implemented')
async def address_get_proof(self, params): async def address_get_proof(self, params):
hash168 = self.extract_hash168(params) hash168 = self.extract_hash168(params)
raise RPCError('get_proof is not yet implemented') raise self.RPCError('get_proof is not yet implemented')
async def address_listunspent(self, params): async def address_listunspent(self, params):
hash168 = self.extract_hash168(params) hash168 = self.extract_hash168(params)
return self.list_unspent(hash168) return await self.list_unspent(hash168)
async def address_subscribe(self, params): async def address_subscribe(self, params):
hash168 = self.extract_hash168(params) hash168 = self.extract_hash168(params)
self.hash168s.add(hash168) self.hash168s.add(hash168)
return self.address_status(hash168) return await self.address_status(hash168)
async def block_get_chunk(self, params): async def block_get_chunk(self, params):
index = self.extract_non_negative_integer(params) index = self.extract_non_negative_integer(params)
@ -529,7 +551,7 @@ class ElectrumX(Session):
tx_hash = self.tx_hash_from_param(params[0]) tx_hash = self.tx_hash_from_param(params[0])
return await self.daemon.getrawtransaction(tx_hash) return await self.daemon.getrawtransaction(tx_hash)
raise RPCError('params wrong length: {}'.format(params)) raise self.RPCError('params wrong length: {}'.format(params))
async def transaction_get_merkle(self, params): async def transaction_get_merkle(self, params):
if len(params) == 2: if len(params) == 2:
@ -537,7 +559,8 @@ class ElectrumX(Session):
height = self.non_negative_integer_from_param(params[1]) height = self.non_negative_integer_from_param(params[1])
return await self.tx_merkle(tx_hash, height) return await self.tx_merkle(tx_hash, height)
raise RPCError('params should contain a transaction hash and height') raise self.RPCError('params should contain a transaction hash '
'and height')
async def utxo_get_address(self, params): async def utxo_get_address(self, params):
if len(params) == 2: if len(params) == 2:
@ -549,7 +572,8 @@ class ElectrumX(Session):
return self.coin.hash168_to_address(hash168) return self.coin.hash168_to_address(hash168)
return None return None
raise RPCError('params should contain a transaction hash and index') raise self.RPCError('params should contain a transaction hash '
'and index')
# --- server commands # --- server commands

View File

@ -1 +1 @@
VERSION = "ElectrumX 0.5.1" VERSION = "ElectrumX 0.6.0"