From 094fd44fc2d003ac99c1093d44f01c772b240728 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 9 Nov 2016 08:21:24 +0900 Subject: [PATCH 1/5] Merge UTXO cache into BlockProcessor --- server/block_processor.py | 268 ++++++++++++++++++++++++++++++++++---- server/cache.py | 257 ------------------------------------ server/db.py | 4 +- 3 files changed, 247 insertions(+), 282 deletions(-) delete mode 100644 server/cache.py diff --git a/server/block_processor.py b/server/block_processor.py index 1eee6b1..aac800e 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -18,7 +18,6 @@ from bisect import bisect_left from collections import defaultdict from functools import partial -from server.cache import UTXOCache, NO_CACHE_ENTRY from server.daemon import Daemon, DaemonError from lib.hash import hash_to_str from lib.tx import Deserializer @@ -26,6 +25,13 @@ from lib.util import chunks, LoggedClass import server.db from server.storage import open_db +# Limits single address history to ~ 65536 * HIST_ENTRIES_PER_KEY entries +HIST_ENTRIES_PER_KEY = 1024 +HIST_VALUE_BYTES = HIST_ENTRIES_PER_KEY * 4 +ADDR_TX_HASH_LEN = 4 +UTXO_TX_HASH_LEN = 4 +NO_HASH_168 = bytes([255]) * 21 +NO_CACHE_ENTRY = NO_HASH_168 + bytes(12) def formatted_time(t): '''Return a number of seconds as a string in days, hours, mins and @@ -208,7 +214,7 @@ class MemPool(LoggedClass): # The mempool is unordered, so process all outputs first so # that looking for inputs has full info. script_hash168 = self.bp.coin.hash168_from_script - utxo_lookup = self.bp.utxo_cache.lookup + utxo_lookup = self.bp.utxo_lookup def txout_pair(txout): return (script_hash168(txout.pk_script), txout.value) @@ -347,10 +353,15 @@ class BlockProcessor(server.db.DB): self.last_flush_tx_count = self.tx_count # Caches of unflushed items - self.utxo_cache = UTXOCache(self.get_tx_hash, self.db, self.coin) self.headers = [] self.tx_hashes = [] + # UTXO cache + self.utxo_cache = {} + self.db_cache = {} + self.utxo_cache_spends = 0 + self.db_deletes = 0 + # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' 'flush count: {:,d} utxo flush count: {:,d} ' @@ -530,22 +541,12 @@ class BlockProcessor(server.db.DB): } batch.put(b'state', repr(state).encode()) - def flush_utxos(self, batch): - self.logger.info('flushing UTXOs: {:,d} txs and {:,d} blocks' - .format(self.tx_count - self.db_tx_count, - self.height - self.db_height)) - self.utxo_cache.flush(batch) - self.utxo_flush_count = self.flush_count - self.db_tx_count = self.tx_count - self.db_height = self.height - self.db_tip = self.tip - def assert_flushed(self): '''Asserts state is fully flushed.''' assert self.tx_count == self.db_tx_count assert not self.history - assert not self.utxo_cache.cache - assert not self.utxo_cache.db_cache + assert not self.utxo_cache + assert not self.db_cache def flush(self, flush_utxos=False, flush_history=None): '''Flush out cached state. @@ -711,8 +712,8 @@ class BlockProcessor(server.db.DB): # whatever reason Python O/S mem usage is typically +30% or # more, so we scale our already bloated object sizes. one_MB = int(1048576 / 1.3) - utxo_cache_size = len(self.utxo_cache.cache) * 187 - db_cache_size = len(self.utxo_cache.db_cache) * 105 + utxo_cache_size = len(self.utxo_cache) * 187 + db_cache_size = len(self.db_cache) * 105 hist_cache_size = len(self.history) * 180 + self.history_size * 4 utxo_MB = (db_cache_size + utxo_cache_size) // one_MB hist_MB = hist_cache_size // one_MB @@ -721,8 +722,8 @@ class BlockProcessor(server.db.DB): .format(self.height, self.daemon.cached_height())) self.logger.info(' entries: UTXO: {:,d} DB: {:,d} ' 'hist addrs: {:,d} hist size {:,d}' - .format(len(self.utxo_cache.cache), - len(self.utxo_cache.db_cache), + .format(len(self.utxo_cache), + len(self.db_cache), len(self.history), self.history_size)) self.logger.info(' size: {:,d}MB (UTXOs {:,d}MB hist {:,d}MB)' @@ -779,8 +780,8 @@ class BlockProcessor(server.db.DB): self.touched.update(touched) def advance_txs(self, tx_hashes, txs, touched): - put_utxo = self.utxo_cache.put - spend_utxo = self.utxo_cache.spend + put_utxo = self.utxo_cache.__setitem__ + spend_utxo = self.spend_utxo undo_info = [] # Use local vars for speed in the loops @@ -861,8 +862,8 @@ class BlockProcessor(server.db.DB): # Use local vars for speed in the loops pack = struct.pack - put_utxo = self.utxo_cache.put - spend_utxo = self.utxo_cache.spend + put_utxo = self.utxo_cache.__setitem__ + spend_utxo = self.spend_utxo rtxs = reversed(txs) rtx_hashes = reversed(tx_hashes) @@ -885,6 +886,227 @@ class BlockProcessor(server.db.DB): assert n == 0 self.tx_count -= len(txs) + '''An in-memory UTXO cache, representing all changes to UTXO state + since the last DB flush. + + We want to store millions, perhaps 10s of millions of these in + memory for optimal performance during initial sync, because then + it is possible to spend UTXOs without ever going to the database + (other than as an entry in the address history, and there is only + one such entry per TX not per UTXO). So store them in a Python + dictionary with binary keys and values. + + Key: TX_HASH + TX_IDX (32 + 2 = 34 bytes) + Value: HASH168 + TX_NUM + VALUE (21 + 4 + 8 = 33 bytes) + + That's 67 bytes of raw data. Python dictionary overhead means + each entry actually uses about 187 bytes of memory. So almost + 11.5 million UTXOs can fit in 2GB of RAM. There are approximately + 42 million UTXOs on bitcoin mainnet at height 433,000. + + Semantics: + + add: Add it to the cache dictionary. + spend: Remove it if in the cache dictionary. + Otherwise it's been flushed to the DB. Each UTXO + is responsible for two entries in the DB stored using + compressed keys. Mark both for deletion in the next + flush of the in-memory UTXO cache. + + A UTXO is stored in the DB in 2 "tables": + + 1. The output value and tx number. Must be keyed with a + hash168 prefix so the unspent outputs and balance of an + arbitrary address can be looked up with a simple key + traversal. + Key: b'u' + hash168 + compressed_tx_hash + tx_idx + Value: a (tx_num, value) pair + + 2. Given a prevout, we need to be able to look up the UTXO key + to remove it. As is keyed by hash168 and that is not part + of the prevout, we need a hash168 lookup. + Key: b'h' + compressed tx_hash + tx_idx + Value: (hash168, tx_num) pair + + The compressed TX hash is just the first few bytes of the hash of + the TX the UTXO is in (and needn't be the same number of bytes in + each table). As this is not unique there will be collisions; + tx_num is stored to resolve them. The collision rate is around + 0.02% for the hash168 table, and almost zero for the UTXO table + (there are around 100 collisions in the whole bitcoin blockchain). + ''' + + def utxo_lookup(self, prev_hash, prev_idx): + '''Given a prevout, return a pair (hash168, value). + + If the UTXO is not found, returns (None, None).''' + # Fast track is it being in the cache + idx_packed = struct.pack(' Date: Thu, 10 Nov 2016 22:49:20 +0900 Subject: [PATCH 2/5] Catch exceptions when trying to listen Fixes #11 --- server/protocol.py | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index 5f5ae54..d671655 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -47,41 +47,42 @@ class BlockServer(BlockProcessor): await self.start_servers() ElectrumX.notify(self.height, self.touched) + async def start_server(self, name, protocol, host, port, *, ssl=None): + loop = asyncio.get_event_loop() + server = loop.create_server(protocol, host, port, ssl=ssl) + try: + self.servers.append(await server) + except asyncio.CancelledError: + raise + except Exception as e: + self.logger.error('{} server failed to listen on {}:{:d} :{}' + .format(name, host, port, e)) + else: + self.logger.info('{} server listening on {}:{:d}' + .format(name, host, port)) + async def start_servers(self): '''Start listening on RPC, TCP and SSL ports. Does not start a server if the port wasn't specified. ''' env = self.env - loop = asyncio.get_event_loop() - JSONRPC.init(self, self.daemon, self.coin) protocol = LocalRPC if env.rpc_port is not None: - host = 'localhost' - rpc_server = loop.create_server(protocol, host, env.rpc_port) - self.servers.append(await rpc_server) - self.logger.info('RPC server listening on {}:{:d}' - .format(host, env.rpc_port)) + await self.start_server('RPC', protocol, 'localhost', env.rpc_port) protocol = partial(ElectrumX, env) if env.tcp_port is not None: - tcp_server = loop.create_server(protocol, env.host, env.tcp_port) - self.servers.append(await tcp_server) - self.logger.info('TCP server listening on {}:{:d}' - .format(env.host, env.tcp_port)) + await self.start_server('TCP', protocol, env.host, env.tcp_port) if env.ssl_port is not None: # FIXME: update if we want to require Python >= 3.5.3 - ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) - ssl_context.load_cert_chain(env.ssl_certfile, - keyfile=env.ssl_keyfile) - ssl_server = loop.create_server(protocol, env.host, env.ssl_port, - ssl=ssl_context) - self.servers.append(await ssl_server) - self.logger.info('SSL server listening on {}:{:d}' - .format(env.host, env.ssl_port)) + sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) + await self.start_server('SSL', protocol, env.host, env.ssl_port, + ssl=sslc) def stop(self): '''Close the listening servers.''' From c6b5c577ab0e8e70f76af5e968fcf7370b1cfb1f Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 10 Nov 2016 23:26:11 +0900 Subject: [PATCH 3/5] More robust daemon error handling Fixes #6 --- server/daemon.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/daemon.py b/server/daemon.py index e94af41..7a886fe 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -91,6 +91,10 @@ class Daemon(util.LoggedClass): msg = 'connection problem - is your daemon running?' except DaemonWarmingUpError: msg = 'still starting up checking blocks...' + except (asyncio.CancelledError, DaemonError): + raise + except Exception as e: + msg = ('request gave unexpected error: {}'.format(e)) if msg != prior_msg or count == 10: self.logger.error('{}. Retrying between sleeps...' From 56098e0c52d35ddd2118d8e608312ab4c1f07657 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 10 Nov 2016 23:50:55 +0900 Subject: [PATCH 4/5] Show up to 3 missing UTXOs Fixes #15 --- server/block_processor.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index aac800e..288f558 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -181,7 +181,7 @@ class MemPool(LoggedClass): ''' hex_hashes = set(hex_hashes) touched = set() - missing_utxos = 0 + missing_utxos = [] initial = self.count < 0 if initial: @@ -235,6 +235,7 @@ class MemPool(LoggedClass): if entry == NO_CACHE_ENTRY: # This happens when the daemon is a block ahead of us # and has mempool txs spending new txs in that block + missing_utxos.append(txin) raise MissingUTXOError value, = struct.unpack(' Date: Thu, 10 Nov 2016 23:57:13 +0900 Subject: [PATCH 5/5] Prepare 0.2.3 release --- docs/ARCHITECTURE.rst | 35 ++++++++++++++--------------------- docs/RELEASE-NOTES | 7 +++++++ server/version.py | 2 +- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/docs/ARCHITECTURE.rst b/docs/ARCHITECTURE.rst index 98c43c4..81d9800 100644 --- a/docs/ARCHITECTURE.rst +++ b/docs/ARCHITECTURE.rst @@ -15,13 +15,13 @@ The components of the server are roughly like this:: - ElectrumX -<<<<<- LocalRPC - ------------- ------------ < > - ---------- ------------------- -------------- - - Daemon -<<<<<<<<- Block processor ->>>>- UTXO Cache - - ---------- ------------------- -------------- - < < > < - -------------- ---------------- - - Prefetcher - - FS + Storage - - -------------- ---------------- + ---------- ------------------- + - Daemon -<<<<<<<<- Block processor - + ---------- ------------------- + < < > + -------------- ----------- + - Prefetcher - - FS + DB - + -------------- ----------- Env @@ -60,22 +60,15 @@ Block Processor Responsible for managing block chain state (UTXO set, history, transaction and undo information) and processing towards the chain -tip. Uses the caches for in-memory state caching. Flushes state to -the storage layer. Reponsible for handling block chain -reorganisations. Once caught up maintains a representation of daemon -mempool state. +tip. Uses the caches for in-memory state updates since the last +flush. Flushes state to the storage layer. Reponsible for handling +block chain reorganisations. Once caught up maintains a +representation of daemon mempool state. -Caches ------- +Database +-------- -The file system cache and the UTXO cache are implementation details of -the block processor, nothing else should interface with them. - -Storage -------- - -Backend database abstraction. Along with the host filesystem, used by -the block processor (and therefore its caches) to store chain state. +The database. Along with the host filesystem stores flushed chain state. Prefetcher ---------- diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index aea65ec..ecdc22d 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -1,3 +1,10 @@ +Version 0.2.3 +------------- + +- fixes issues #6, #11, #15 +- the UTXO cache is now merged with BlockProcessor, where it properly belongs. + cache.py no longer exists + Version 0.2.2.1 --------------- diff --git a/server/version.py b/server/version.py index e6541a2..cf6e859 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.2.2.1" +VERSION = "ElectrumX 0.2.3"