From 1393f6a0304c16b8f47c938f15f11d27dcb1ff1a Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 8 Nov 2016 00:04:10 +0900 Subject: [PATCH 01/17] Move signal handling out of the controller --- electrumx_server.py | 37 ++++++++++++++++++++++++++++++------- server/controller.py | 13 ------------- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/electrumx_server.py b/electrumx_server.py index 60948fa..9429883 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -9,16 +9,29 @@ '''Script to kick off the server.''' - import asyncio import logging import os +import signal import traceback +from functools import partial from server.env import Env from server.controller import Controller +def cancel_tasks(loop): + # Cancel and collect the remaining tasks + tasks = asyncio.Task.all_tasks() + for task in tasks: + task.cancel() + + try: + loop.run_until_complete(asyncio.gather(*tasks)) + except asyncio.CancelledError: + pass + + def main_loop(): '''Get tasks; loop until complete.''' if os.geteuid() == 0: @@ -33,16 +46,26 @@ def main_loop(): #loop.set_debug(True) controller = Controller(loop, env) - controller.start() - tasks = asyncio.Task.all_tasks(loop) + # Signal handlers + def on_signal(signame): + '''Call on receipt of a signal to cleanly shutdown.''' + logging.warning('received {} signal, preparing to shut down' + .format(signame)) + loop.stop() + + for signame in ('SIGINT', 'SIGTERM'): + loop.add_signal_handler(getattr(signal, signame), + partial(on_signal, signame)) + + controller.start() try: - loop.run_until_complete(asyncio.gather(*tasks)) - except asyncio.CancelledError: - logging.warning('task cancelled; asyncio event loop closing') + loop.run_forever() finally: controller.stop() - loop.close() + cancel_tasks(loop) + + loop.close() def main(): diff --git a/server/controller.py b/server/controller.py index 6cfd030..a30aeb4 100644 --- a/server/controller.py +++ b/server/controller.py @@ -12,7 +12,6 @@ client-serving data such as histories. ''' import asyncio -import signal import ssl from functools import partial @@ -46,11 +45,6 @@ class Controller(LoggedClass): for coro in coros: asyncio.ensure_future(coro) - # Signal handlers - for signame in ('SIGINT', 'SIGTERM'): - self.loop.add_signal_handler(getattr(signal, signame), - partial(self.on_signal, signame)) - async def on_update(self, height, touched): if not self.servers: self.servers = await self.start_servers() @@ -98,10 +92,3 @@ class Controller(LoggedClass): '''Close the listening servers.''' for server in self.servers: server.close() - - def on_signal(self, signame): - '''Call on receipt of a signal to cleanly shutdown.''' - self.logger.warning('received {} signal, preparing to shut down' - .format(signame)) - for task in asyncio.Task.all_tasks(self.loop): - task.cancel() From c0a112f8ea3a9d5bc790fdc149f90b91b62dcfe3 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 8 Nov 2016 07:32:55 +0900 Subject: [PATCH 02/17] Split out part of block processor into db.py The part that doesn't actually do any block processing... --- server/block_processor.py | 124 ++------------------------------- server/cache.py | 8 +-- server/db.py | 142 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 151 insertions(+), 123 deletions(-) create mode 100644 server/db.py diff --git a/server/block_processor.py b/server/block_processor.py index 6116924..1412cbc 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -22,6 +22,7 @@ from server.daemon import DaemonError from lib.hash import hash_to_str from lib.tx import Deserializer from lib.util import chunks, LoggedClass +import server.db from server.storage import open_db @@ -33,9 +34,6 @@ def formatted_time(t): t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60) -UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") - - class ChainError(Exception): pass @@ -283,7 +281,7 @@ class MemPool(LoggedClass): return value -class BlockProcessor(LoggedClass): +class BlockProcessor(server.db.DB): '''Process blocks and update the DB state to match. Employ a prefetcher to prefetch blocks in batches for processing. @@ -292,9 +290,8 @@ class BlockProcessor(LoggedClass): def __init__(self, env, daemon, on_update=None): '''on_update is awaitable, and called only when caught up with the - daemon and a new block arrives or the mempool is updated. - ''' - super().__init__() + daemon and a new block arrives or the mempool is updated.''' + super().__init__(env.coin, env.db_engine) self.daemon = daemon self.on_update = on_update @@ -305,39 +302,16 @@ class BlockProcessor(LoggedClass): self.utxo_MB = env.utxo_MB self.hist_MB = env.hist_MB self.next_cache_check = 0 - self.coin = env.coin self.reorg_limit = env.reorg_limit - # Open DB and metadata files. Record some of its state. - db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) - self.db = open_db(db_name, env.db_engine) - if self.db.is_new: - self.logger.info('created new {} database {}' - .format(env.db_engine, db_name)) - else: - self.logger.info('successfully opened {} database {}' - .format(env.db_engine, db_name)) - - self.init_state() - self.tx_count = self.db_tx_count - self.height = self.db_height - self.tip = self.db_tip - - # Caches to be flushed later. Headers and tx_hashes have one - # entry per block + # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - self.utxo_cache = UTXOCache(self, self.db, self.coin) - self.fs_cache = FSCache(self.coin, self.height, self.tx_count) self.prefetcher = Prefetcher(daemon, self.height) self.last_flush = time.time() self.last_flush_tx_count = self.tx_count - # Redirected member funcs - self.get_tx_hash = self.fs_cache.get_tx_hash - self.read_headers = self.fs_cache.read_headers - # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' 'flush count: {:,d} utxo flush count: {:,d} ' @@ -451,30 +425,6 @@ class BlockProcessor(LoggedClass): return self.fs_cache.block_hashes(start, count) - def init_state(self): - if self.db.is_new: - self.db_height = -1 - self.db_tx_count = 0 - self.db_tip = b'\0' * 32 - self.flush_count = 0 - self.utxo_flush_count = 0 - self.wall_time = 0 - self.first_sync = True - else: - state = self.db.get(b'state') - state = ast.literal_eval(state.decode()) - if state['genesis'] != self.coin.GENESIS_HASH: - raise ChainError('DB genesis hash {} does not match coin {}' - .format(state['genesis_hash'], - self.coin.GENESIS_HASH)) - self.db_height = state['height'] - self.db_tx_count = state['tx_count'] - self.db_tip = state['tip'] - self.flush_count = state['flush_count'] - self.utxo_flush_count = state['utxo_flush_count'] - self.wall_time = state['wall_time'] - self.first_sync = state.get('first_sync', True) - def clean_db(self): '''Clean out stale DB items. @@ -839,13 +789,6 @@ class BlockProcessor(LoggedClass): assert n == 0 self.tx_count -= len(txs) - @staticmethod - def resolve_limit(limit): - if limit is None: - return -1 - assert isinstance(limit, int) and limit >= 0 - return limit - def mempool_transactions(self, hash168): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool entries for the hash168. @@ -860,60 +803,3 @@ class BlockProcessor(LoggedClass): Can be positive or negative. ''' return self.mempool.value(hash168) - - def get_history(self, hash168, limit=1000): - '''Generator that returns an unpruned, sorted list of (tx_hash, - height) tuples of confirmed transactions that touched the address, - earliest in the blockchain first. Includes both spending and - receiving transactions. By default yields at most 1000 entries. - Set limit to None to get them all. - ''' - limit = self.resolve_limit(limit) - prefix = b'H' + hash168 - for key, hist in self.db.iterator(prefix=prefix): - a = array.array('I') - a.frombytes(hist) - for tx_num in a: - if limit == 0: - return - yield self.get_tx_hash(tx_num) - limit -= 1 - - def get_balance(self, hash168): - '''Returns the confirmed balance of an address.''' - return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None)) - - def get_utxos(self, hash168, limit=1000): - '''Generator that yields all UTXOs for an address sorted in no - particular order. By default yields at most 1000 entries. - Set limit to None to get them all. - ''' - limit = self.resolve_limit(limit) - unpack = struct.unpack - prefix = b'u' + hash168 - for k, v in self.db.iterator(prefix=prefix): - (tx_pos,) = unpack('= 0 + return limit + + def get_history(self, hash168, limit=1000): + '''Generator that returns an unpruned, sorted list of (tx_hash, + height) tuples of confirmed transactions that touched the address, + earliest in the blockchain first. Includes both spending and + receiving transactions. By default yields at most 1000 entries. + Set limit to None to get them all. + ''' + limit = self._resolve_limit(limit) + prefix = b'H' + hash168 + for key, hist in self.db.iterator(prefix=prefix): + a = array.array('I') + a.frombytes(hist) + for tx_num in a: + if limit == 0: + return + yield self.get_tx_hash(tx_num) + limit -= 1 + + def get_balance(self, hash168): + '''Returns the confirmed balance of an address.''' + return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None)) + + def get_utxos(self, hash168, limit=1000): + '''Generator that yields all UTXOs for an address sorted in no + particular order. By default yields at most 1000 entries. + Set limit to None to get them all. + ''' + limit = self._resolve_limit(limit) + unpack = struct.unpack + prefix = b'u' + hash168 + for k, v in self.db.iterator(prefix=prefix): + (tx_pos,) = unpack(' Date: Tue, 8 Nov 2016 08:09:59 +0900 Subject: [PATCH 03/17] Create BlockServer Controller now an empty shell --- server/block_processor.py | 71 +++++++++++++++++++++++++++++++++++---- server/controller.py | 56 ++---------------------------- 2 files changed, 67 insertions(+), 60 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 1412cbc..c58284a 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -11,6 +11,7 @@ import array import ast import asyncio +import ssl import struct import time from bisect import bisect_left @@ -19,6 +20,7 @@ from functools import partial from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY from server.daemon import DaemonError +from server.protocol import ElectrumX, LocalRPC, JSONRPC from lib.hash import hash_to_str from lib.tx import Deserializer from lib.util import chunks, LoggedClass @@ -288,13 +290,14 @@ class BlockProcessor(server.db.DB): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, daemon, on_update=None): + def __init__(self, env, daemon): '''on_update is awaitable, and called only when caught up with the daemon and a new block arrives or the mempool is updated.''' super().__init__(env.coin, env.db_engine) + daemon.debug_set_height(self.height) + self.env = env self.daemon = daemon - self.on_update = on_update self.mempool = MemPool(self) self.touched = set() @@ -330,7 +333,6 @@ class BlockProcessor(server.db.DB): self.clean_db() def coros(self): - self.daemon.debug_set_height(self.height) return [self.start(), self.prefetcher.start()] async def start(self): @@ -359,6 +361,7 @@ class BlockProcessor(server.db.DB): await asyncio.sleep(0) # Yield if caught_up: await self.caught_up(mempool_hashes) + self.touched = set() except ChainReorg: await self.handle_chain_reorg() @@ -370,10 +373,7 @@ class BlockProcessor(server.db.DB): if self.first_sync: self.first_sync = False self.logger.info('synced to height {:,d}'.format(self.height)) - if self.on_update: - self.touched.update(await self.mempool.update(mempool_hashes)) - await self.on_update(self.height, self.touched) - self.touched = set() + self.touched.update(await self.mempool.update(mempool_hashes)) async def handle_chain_reorg(self): # First get all state on disk @@ -803,3 +803,60 @@ class BlockProcessor(server.db.DB): Can be positive or negative. ''' return self.mempool.value(hash168) + + +class BlockServer(BlockProcessor): + '''Like BlockProcessor but also starts servers when caught up.''' + + def __init__(self, env, daemon): + '''on_update is awaitable, and called only when caught up with the + daemon and a new block arrives or the mempool is updated.''' + super().__init__(env, daemon) + self.servers = [] + + async def caught_up(self, mempool_hashes): + await super().caught_up(mempool_hashes) + if not self.servers: + await self.start_servers() + ElectrumX.notify(self.height, self.touched) + + async def start_servers(self): + '''Start listening on RPC, TCP and SSL ports. + + Does not start a server if the port wasn't specified. + ''' + env = self.env + loop = asyncio.get_event_loop() + + JSONRPC.init(self, self.daemon, self.coin) + + protocol = LocalRPC + if env.rpc_port is not None: + host = 'localhost' + rpc_server = loop.create_server(protocol, host, env.rpc_port) + self.servers.append(await rpc_server) + self.logger.info('RPC server listening on {}:{:d}' + .format(host, env.rpc_port)) + + protocol = partial(ElectrumX, env) + if env.tcp_port is not None: + tcp_server = loop.create_server(protocol, env.host, env.tcp_port) + self.servers.append(await tcp_server) + self.logger.info('TCP server listening on {}:{:d}' + .format(env.host, env.tcp_port)) + + if env.ssl_port is not None: + # FIXME: update if we want to require Python >= 3.5.3 + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ssl_context.load_cert_chain(env.ssl_certfile, + keyfile=env.ssl_keyfile) + ssl_server = loop.create_server(protocol, env.host, env.ssl_port, + ssl=ssl_context) + self.servers.append(await ssl_server) + self.logger.info('SSL server listening on {}:{:d}' + .format(env.host, env.ssl_port)) + + def stop(self): + '''Close the listening servers.''' + for server in self.servers: + server.close() diff --git a/server/controller.py b/server/controller.py index a30aeb4..dffae97 100644 --- a/server/controller.py +++ b/server/controller.py @@ -12,12 +12,9 @@ client-serving data such as histories. ''' import asyncio -import ssl -from functools import partial from server.daemon import Daemon -from server.block_processor import BlockProcessor -from server.protocol import ElectrumX, LocalRPC, JSONRPC +from server.block_processor import BlockServer from lib.util import LoggedClass @@ -33,10 +30,7 @@ class Controller(LoggedClass): self.env = env self.coin = env.coin self.daemon = Daemon(env.daemon_url, env.debug) - self.block_processor = BlockProcessor(env, self.daemon, - on_update=self.on_update) - JSONRPC.init(self.block_processor, self.daemon, self.coin) - self.servers = [] + self.block_processor = BlockServer(env, self.daemon) def start(self): '''Prime the event loop with asynchronous jobs.''' @@ -45,50 +39,6 @@ class Controller(LoggedClass): for coro in coros: asyncio.ensure_future(coro) - async def on_update(self, height, touched): - if not self.servers: - self.servers = await self.start_servers() - ElectrumX.notify(height, touched) - - async def start_servers(self): - '''Start listening on RPC, TCP and SSL ports. - - Does not start a server if the port wasn't specified. Does - nothing if servers are already running. - ''' - servers = [] - env = self.env - loop = self.loop - - protocol = LocalRPC - if env.rpc_port is not None: - host = 'localhost' - rpc_server = loop.create_server(protocol, host, env.rpc_port) - servers.append(await rpc_server) - self.logger.info('RPC server listening on {}:{:d}' - .format(host, env.rpc_port)) - - protocol = partial(ElectrumX, env) - if env.tcp_port is not None: - tcp_server = loop.create_server(protocol, env.host, env.tcp_port) - servers.append(await tcp_server) - self.logger.info('TCP server listening on {}:{:d}' - .format(env.host, env.tcp_port)) - - if env.ssl_port is not None: - # FIXME: update if we want to require Python >= 3.5.3 - ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) - ssl_context.load_cert_chain(env.ssl_certfile, - keyfile=env.ssl_keyfile) - ssl_server = loop.create_server(protocol, env.host, env.ssl_port, - ssl=ssl_context) - servers.append(await ssl_server) - self.logger.info('SSL server listening on {}:{:d}' - .format(env.host, env.ssl_port)) - - return servers - def stop(self): '''Close the listening servers.''' - for server in self.servers: - server.close() + self.block_processor.stop() From 93d53bdd8772721f1eb4d35c4e99f88466f922a7 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 8 Nov 2016 08:29:56 +0900 Subject: [PATCH 04/17] The controller is dead! --- docs/ARCHITECTURE.rst | 7 ------- electrumx_server.py | 27 ++++++++++-------------- server/block_processor.py | 33 ++++++++++++++++------------- server/controller.py | 44 --------------------------------------- server/db.py | 18 ++++++++++------ 5 files changed, 42 insertions(+), 87 deletions(-) delete mode 100644 server/controller.py diff --git a/docs/ARCHITECTURE.rst b/docs/ARCHITECTURE.rst index 3af6c99..3bd8f5e 100644 --- a/docs/ARCHITECTURE.rst +++ b/docs/ARCHITECTURE.rst @@ -90,10 +90,3 @@ IRC Not currently imlpemented; will handle IRC communication for the ElectrumX servers. - -Controller ----------- - -A historical artefact that currently coordinates some of the above -components. Not pictured as it is doesn't seem to have a logical -place and so is probably going away. diff --git a/electrumx_server.py b/electrumx_server.py index 9429883..94d65c8 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -17,11 +17,11 @@ import traceback from functools import partial from server.env import Env -from server.controller import Controller +from server.block_processor import BlockServer -def cancel_tasks(loop): - # Cancel and collect the remaining tasks +def close_loop(loop): + '''Close the loop down cleanly. Cancel and collect remaining tasks.''' tasks = asyncio.Task.all_tasks() for task in tasks: task.cancel() @@ -31,41 +31,36 @@ def cancel_tasks(loop): except asyncio.CancelledError: pass + loop.close() + def main_loop(): - '''Get tasks; loop until complete.''' + '''Start the server.''' if os.geteuid() == 0: raise Exception('DO NOT RUN AS ROOT! Create an unpriveleged user ' 'account and use that') - env = Env() - logging.info('switching current directory to {}'.format(env.db_dir)) - os.chdir(env.db_dir) - loop = asyncio.get_event_loop() #loop.set_debug(True) - controller = Controller(loop, env) - - # Signal handlers def on_signal(signame): '''Call on receipt of a signal to cleanly shutdown.''' logging.warning('received {} signal, preparing to shut down' .format(signame)) loop.stop() + # Install signal handlers for signame in ('SIGINT', 'SIGTERM'): loop.add_signal_handler(getattr(signal, signame), partial(on_signal, signame)) - controller.start() + server = BlockServer(Env()) + server.start() try: loop.run_forever() finally: - controller.stop() - cancel_tasks(loop) - - loop.close() + server.stop() + close_loop(loop) def main(): diff --git a/server/block_processor.py b/server/block_processor.py index c58284a..0309491 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -19,7 +19,7 @@ from collections import defaultdict, namedtuple from functools import partial from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY -from server.daemon import DaemonError +from server.daemon import Daemon, DaemonError from server.protocol import ElectrumX, LocalRPC, JSONRPC from lib.hash import hash_to_str from lib.tx import Deserializer @@ -78,7 +78,11 @@ class Prefetcher(LoggedClass): else: return blocks, None - async def start(self): + def start(self): + '''Start the prefetcher.''' + asyncio.ensure_future(self.main_loop()) + + async def main_loop(self): '''Loop forever polling for more blocks.''' self.logger.info('starting daemon poll loop...') while True: @@ -290,14 +294,13 @@ class BlockProcessor(server.db.DB): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, daemon): + def __init__(self, env): '''on_update is awaitable, and called only when caught up with the daemon and a new block arrives or the mempool is updated.''' - super().__init__(env.coin, env.db_engine) - daemon.debug_set_height(self.height) + super().__init__(env) - self.env = env - self.daemon = daemon + self.daemon = Daemon(env.daemon_url, env.debug) + self.daemon.debug_set_height(self.height) self.mempool = MemPool(self) self.touched = set() @@ -310,7 +313,7 @@ class BlockProcessor(server.db.DB): # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - self.prefetcher = Prefetcher(daemon, self.height) + self.prefetcher = Prefetcher(self.daemon, self.height) self.last_flush = time.time() self.last_flush_tx_count = self.tx_count @@ -332,11 +335,13 @@ class BlockProcessor(server.db.DB): self.clean_db() - def coros(self): - return [self.start(), self.prefetcher.start()] + def start(self): + '''Start the block processor.''' + asyncio.ensure_future(self.main_loop()) + self.prefetcher.start() - async def start(self): - '''External entry point for block processing. + async def main_loop(self): + '''Main loop for block processing. Safely flushes the DB on clean shutdown. ''' @@ -808,10 +813,10 @@ class BlockProcessor(server.db.DB): class BlockServer(BlockProcessor): '''Like BlockProcessor but also starts servers when caught up.''' - def __init__(self, env, daemon): + def __init__(self, env): '''on_update is awaitable, and called only when caught up with the daemon and a new block arrives or the mempool is updated.''' - super().__init__(env, daemon) + super().__init__(env) self.servers = [] async def caught_up(self, mempool_hashes): diff --git a/server/controller.py b/server/controller.py deleted file mode 100644 index dffae97..0000000 --- a/server/controller.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright (c) 2016, Neil Booth -# -# All rights reserved. -# -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. - -'''Server controller. - -Coordinates the parts of the server. Serves as a cache for -client-serving data such as histories. -''' - -import asyncio - -from server.daemon import Daemon -from server.block_processor import BlockServer -from lib.util import LoggedClass - - -class Controller(LoggedClass): - - def __init__(self, loop, env): - '''Create up the controller. - - Creates DB, Daemon and BlockProcessor instances. - ''' - super().__init__() - self.loop = loop - self.env = env - self.coin = env.coin - self.daemon = Daemon(env.daemon_url, env.debug) - self.block_processor = BlockServer(env, self.daemon) - - def start(self): - '''Prime the event loop with asynchronous jobs.''' - coros = self.block_processor.coros() - - for coro in coros: - asyncio.ensure_future(coro) - - def stop(self): - '''Close the listening servers.''' - self.block_processor.stop() diff --git a/server/db.py b/server/db.py index f253213..0256ff4 100644 --- a/server/db.py +++ b/server/db.py @@ -9,6 +9,7 @@ import array import ast +import os import struct from collections import namedtuple @@ -26,19 +27,24 @@ class DB(LoggedClass): it was shutdown uncleanly. ''' - def __init__(self, coin, db_engine): + def __init__(self, env): super().__init__() - self.coin = coin + self.env = env + self.coin = env.coin + + self.logger.info('switching current directory to {}' + .format(env.db_dir)) + os.chdir(env.db_dir) # Open DB and metadata files. Record some of its state. - db_name = '{}-{}'.format(coin.NAME, coin.NET) - self.db = open_db(db_name, db_engine) + db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) + self.db = open_db(db_name, env.db_engine) if self.db.is_new: self.logger.info('created new {} database {}' - .format(db_engine, db_name)) + .format(env.db_engine, db_name)) else: self.logger.info('successfully opened {} database {}' - .format(db_engine, db_name)) + .format(env.db_engine, db_name)) self.init_state_from_db() self.tx_count = self.db_tx_count From f020fcf9770b540c3a6c35e6388b02d37ad5b20b Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 8 Nov 2016 08:31:43 +0900 Subject: [PATCH 05/17] Update query.py --- query.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/query.py b/query.py index faea87a..8fe7c94 100755 --- a/query.py +++ b/query.py @@ -13,11 +13,10 @@ Not currently documented; might become easier to use in future. ''' -import os import sys from server.env import Env -from server.block_processor import BlockProcessor +from server.DB import DB from lib.hash import hash_to_str @@ -40,9 +39,8 @@ def count_entries(db): def main(): env = Env() + bp = DB(env) coin = env.coin - os.chdir(env.db_dir) - bp = BlockProcessor(env, None) if len(sys.argv) == 1: count_entries(bp.db) return From 8e6e8329ac06974c414ff7d21ac61f4c00f1a1b5 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 8 Nov 2016 19:14:52 +0900 Subject: [PATCH 06/17] Remove dead code. Also use native python3 functions to speed up 2 lib functions --- lib/script.py | 76 --------------------------------------------------- lib/util.py | 13 ++------- 2 files changed, 3 insertions(+), 86 deletions(-) diff --git a/lib/script.py b/lib/script.py index 503d38b..98c8fef 100644 --- a/lib/script.py +++ b/lib/script.py @@ -12,8 +12,6 @@ import struct from collections import namedtuple from lib.enum import Enumeration -from lib.hash import hash160 -from lib.util import cachedproperty class ScriptError(Exception): @@ -58,80 +56,6 @@ assert OpCodes.OP_CHECKSIG == 0xac assert OpCodes.OP_CHECKMULTISIG == 0xae -class ScriptSig(object): - '''A script from a tx input. - - Typically provides one or more signatures.''' - - SIG_ADDRESS, SIG_MULTI, SIG_PUBKEY, SIG_UNKNOWN = range(4) - - def __init__(self, script, coin, kind, sigs, pubkeys): - self.script = script - self.coin = coin - self.kind = kind - self.sigs = sigs - self.pubkeys = pubkeys - - @cachedproperty - def address(self): - if self.kind == SIG_ADDRESS: - return self.coin.address_from_pubkey(self.pubkeys[0]) - if self.kind == SIG_MULTI: - return self.coin.multsig_address(self.pubkeys) - return 'Unknown' - - @classmethod - def from_script(cls, script, coin): - '''Return an instance of this class. - - Return an object with kind SIG_UNKNOWN for unrecognised scripts.''' - try: - return cls.parse_script(script, coin) - except ScriptError: - return cls(script, coin, SIG_UNKNOWN, [], []) - - @classmethod - def parse_script(cls, script, coin): - '''Return an instance of this class. - - Raises on unrecognised scripts.''' - ops, datas = Script.get_ops(script) - - # Address, PubKey and P2SH redeems only push data - if not ops or not Script.match_ops(ops, [-1] * len(ops)): - raise ScriptError('unknown scriptsig pattern') - - # Assume double data pushes are address redeems, single data - # pushes are pubkey redeems - if len(ops) == 2: # Signature, pubkey - return cls(script, coin, SIG_ADDRESS, [datas[0]], [datas[1]]) - - if len(ops) == 1: # Pubkey - return cls(script, coin, SIG_PUBKEY, [datas[0]], []) - - # Presumably it is P2SH (though conceivably the above could be - # too; cannot be sure without the send-to script). We only - # handle CHECKMULTISIG P2SH, which because of a bitcoin core - # bug always start with an unused OP_0. - if ops[0] != OpCodes.OP_0: - raise ScriptError('unknown scriptsig pattern; expected OP_0') - - # OP_0, Sig1, ..., SigM, pk_script - m = len(ops) - 2 - pk_script = datas[-1] - pk_ops, pk_datas = Script.get_ops(script) - - # OP_2 pubkey1 pubkey2 pubkey3 OP_3 OP_CHECKMULTISIG - n = len(pk_ops) - 3 - pattern = ([OpCodes.OP_1 + m - 1] + [-1] * n - + [OpCodes.OP_1 + n - 1, OpCodes.OP_CHECKMULTISIG]) - - if m <= n and Script.match_ops(pk_ops, pattern): - return cls(script, coin, SIG_MULTI, datas[1:-1], pk_datas[1:-2]) - - raise ScriptError('unknown multisig P2SH pattern') - - class ScriptPubKey(object): '''A class for handling a tx output script that gives conditions necessary for spending. diff --git a/lib/util.py b/lib/util.py index eb34f20..f41d4f2 100644 --- a/lib/util.py +++ b/lib/util.py @@ -82,6 +82,7 @@ def subclasses(base_class, strict=True): pairs = inspect.getmembers(sys.modules[base_class.__module__], select) return [pair[1] for pair in pairs] + def chunks(items, size): '''Break up items, an iterable, into chunks of length size.''' for i in range(0, len(items), size): @@ -90,20 +91,12 @@ def chunks(items, size): def bytes_to_int(be_bytes): '''Interprets a big-endian sequence of bytes as an integer''' - assert isinstance(be_bytes, (bytes, bytearray)) - value = 0 - for byte in be_bytes: - value = value * 256 + byte - return value + return int.from_bytes(be_bytes, 'big') def int_to_bytes(value): '''Converts an integer to a big-endian sequence of bytes''' - mods = [] - while value: - value, mod = divmod(value, 256) - mods.append(mod) - return bytes(reversed(mods)) + return value.to_bytes((value.bit_length() + 7) // 8, 'big') def increment_byte_string(bs): From 5c5e90d5740b19ecf1dc40c3e3ef4653ee11fc8e Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 8 Nov 2016 19:20:13 +0900 Subject: [PATCH 07/17] Move BlockServer to more appropriate location --- electrumx_server.py | 2 +- server/block_processor.py | 62 +-------------------------------------- server/db.py | 1 + server/protocol.py | 59 +++++++++++++++++++++++++++++++++++++ 4 files changed, 62 insertions(+), 62 deletions(-) diff --git a/electrumx_server.py b/electrumx_server.py index 94d65c8..6a817f8 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -17,7 +17,7 @@ import traceback from functools import partial from server.env import Env -from server.block_processor import BlockServer +from server.protocol import BlockServer def close_loop(loop): diff --git a/server/block_processor.py b/server/block_processor.py index 0309491..23dffcd 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -9,18 +9,15 @@ import array -import ast import asyncio -import ssl import struct import time from bisect import bisect_left -from collections import defaultdict, namedtuple +from collections import defaultdict from functools import partial from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY from server.daemon import Daemon, DaemonError -from server.protocol import ElectrumX, LocalRPC, JSONRPC from lib.hash import hash_to_str from lib.tx import Deserializer from lib.util import chunks, LoggedClass @@ -808,60 +805,3 @@ class BlockProcessor(server.db.DB): Can be positive or negative. ''' return self.mempool.value(hash168) - - -class BlockServer(BlockProcessor): - '''Like BlockProcessor but also starts servers when caught up.''' - - def __init__(self, env): - '''on_update is awaitable, and called only when caught up with the - daemon and a new block arrives or the mempool is updated.''' - super().__init__(env) - self.servers = [] - - async def caught_up(self, mempool_hashes): - await super().caught_up(mempool_hashes) - if not self.servers: - await self.start_servers() - ElectrumX.notify(self.height, self.touched) - - async def start_servers(self): - '''Start listening on RPC, TCP and SSL ports. - - Does not start a server if the port wasn't specified. - ''' - env = self.env - loop = asyncio.get_event_loop() - - JSONRPC.init(self, self.daemon, self.coin) - - protocol = LocalRPC - if env.rpc_port is not None: - host = 'localhost' - rpc_server = loop.create_server(protocol, host, env.rpc_port) - self.servers.append(await rpc_server) - self.logger.info('RPC server listening on {}:{:d}' - .format(host, env.rpc_port)) - - protocol = partial(ElectrumX, env) - if env.tcp_port is not None: - tcp_server = loop.create_server(protocol, env.host, env.tcp_port) - self.servers.append(await tcp_server) - self.logger.info('TCP server listening on {}:{:d}' - .format(env.host, env.tcp_port)) - - if env.ssl_port is not None: - # FIXME: update if we want to require Python >= 3.5.3 - ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) - ssl_context.load_cert_chain(env.ssl_certfile, - keyfile=env.ssl_keyfile) - ssl_server = loop.create_server(protocol, env.host, env.ssl_port, - ssl=ssl_context) - self.servers.append(await ssl_server) - self.logger.info('SSL server listening on {}:{:d}' - .format(env.host, env.ssl_port)) - - def stop(self): - '''Close the listening servers.''' - for server in self.servers: - server.close() diff --git a/server/db.py b/server/db.py index 0256ff4..9215dbd 100644 --- a/server/db.py +++ b/server/db.py @@ -7,6 +7,7 @@ '''Interface to the blockchain database.''' + import array import ast import os diff --git a/server/protocol.py b/server/protocol.py index 53c9572..0ea78f6 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -11,10 +11,12 @@ import asyncio import codecs import json +import ssl import traceback from collections import namedtuple from functools import partial +from server.block_processor import BlockProcessor from server.daemon import DaemonError from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.util import LoggedClass @@ -30,6 +32,63 @@ def json_notification(method, params): return {'id': None, 'method': method, 'params': params} +class BlockServer(BlockProcessor): + '''Like BlockProcessor but also starts servers when caught up.''' + + def __init__(self, env): + '''on_update is awaitable, and called only when caught up with the + daemon and a new block arrives or the mempool is updated.''' + super().__init__(env) + self.servers = [] + + async def caught_up(self, mempool_hashes): + await super().caught_up(mempool_hashes) + if not self.servers: + await self.start_servers() + ElectrumX.notify(self.height, self.touched) + + async def start_servers(self): + '''Start listening on RPC, TCP and SSL ports. + + Does not start a server if the port wasn't specified. + ''' + env = self.env + loop = asyncio.get_event_loop() + + JSONRPC.init(self, self.daemon, self.coin) + + protocol = LocalRPC + if env.rpc_port is not None: + host = 'localhost' + rpc_server = loop.create_server(protocol, host, env.rpc_port) + self.servers.append(await rpc_server) + self.logger.info('RPC server listening on {}:{:d}' + .format(host, env.rpc_port)) + + protocol = partial(ElectrumX, env) + if env.tcp_port is not None: + tcp_server = loop.create_server(protocol, env.host, env.tcp_port) + self.servers.append(await tcp_server) + self.logger.info('TCP server listening on {}:{:d}' + .format(env.host, env.tcp_port)) + + if env.ssl_port is not None: + # FIXME: update if we want to require Python >= 3.5.3 + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ssl_context.load_cert_chain(env.ssl_certfile, + keyfile=env.ssl_keyfile) + ssl_server = loop.create_server(protocol, env.host, env.ssl_port, + ssl=ssl_context) + self.servers.append(await ssl_server) + self.logger.info('SSL server listening on {}:{:d}' + .format(env.host, env.ssl_port)) + + def stop(self): + '''Close the listening servers.''' + for server in self.servers: + server.close() + + AsyncTask = namedtuple('AsyncTask', 'session job') class SessionManager(LoggedClass): From 5736e9cb7068ad9ce64e1f9d0088f21001bab344 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 8 Nov 2016 19:35:21 +0900 Subject: [PATCH 08/17] Move utxo_cache to BlockProcessor where it belongs --- server/block_processor.py | 5 ++++- server/db.py | 40 ++++++++++++++++++++++++++++++--------- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 23dffcd..12caf29 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -16,7 +16,7 @@ from bisect import bisect_left from collections import defaultdict from functools import partial -from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY +from server.cache import UTXOCache, NO_CACHE_ENTRY from server.daemon import Daemon, DaemonError from lib.hash import hash_to_str from lib.tx import Deserializer @@ -315,6 +315,9 @@ class BlockProcessor(server.db.DB): self.last_flush = time.time() self.last_flush_tx_count = self.tx_count + # UTXO cache + self.utxo_cache = UTXOCache(self.get_tx_hash, self.db, self.coin) + # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' 'flush count: {:,d} utxo flush count: {:,d} ' diff --git a/server/db.py b/server/db.py index 9215dbd..1415514 100644 --- a/server/db.py +++ b/server/db.py @@ -14,13 +14,12 @@ import os import struct from collections import namedtuple -from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY +from server.cache import FSCache, NO_CACHE_ENTRY from lib.util import LoggedClass from server.storage import open_db UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") - class DB(LoggedClass): '''Simple wrapper of the backend database for querying. @@ -28,6 +27,9 @@ class DB(LoggedClass): it was shutdown uncleanly. ''' + class DBError(Exception): + pass + def __init__(self, env): super().__init__() self.env = env @@ -57,9 +59,6 @@ class DB(LoggedClass): self.get_tx_hash = self.fs_cache.get_tx_hash self.read_headers = self.fs_cache.read_headers - # UTXO cache - self.utxo_cache = UTXOCache(self.get_tx_hash, self.db, self.coin) - def init_state_from_db(self): if self.db.is_new: self.db_height = -1 @@ -73,9 +72,9 @@ class DB(LoggedClass): state = self.db.get(b'state') state = ast.literal_eval(state.decode()) if state['genesis'] != self.coin.GENESIS_HASH: - raise ChainError('DB genesis hash {} does not match coin {}' - .format(state['genesis_hash'], - self.coin.GENESIS_HASH)) + raise self.DBError('DB genesis hash {} does not match coin {}' + .format(state['genesis_hash'], + self.coin.GENESIS_HASH)) self.db_height = state['height'] self.db_tx_count = state['tx_count'] self.db_tip = state['tip'] @@ -143,7 +142,30 @@ class DB(LoggedClass): hash168 = None if 0 <= index <= 65535: idx_packed = struct.pack(' Date: Tue, 8 Nov 2016 20:06:06 +0900 Subject: [PATCH 09/17] Remove the FS cache Really belongs with BlockProcessor now --- server/block_processor.py | 86 ++++++++++++++++++-- server/cache.py | 160 +------------------------------------- server/db.py | 89 ++++++++++++++++++--- 3 files changed, 161 insertions(+), 174 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 12caf29..679bc05 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -10,6 +10,8 @@ import array import asyncio +import itertools +import os import struct import time from bisect import bisect_left @@ -411,7 +413,7 @@ class BlockProcessor(server.db.DB): start = self.height - 1 count = 1 while start > 0: - hashes = self.fs_cache.block_hashes(start, count) + hashes = self.block_hashes(start, count) hex_hashes = [hash_to_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) n = match_pos(hex_hashes, d_hex_hashes) @@ -428,7 +430,7 @@ class BlockProcessor(server.db.DB): 'height {:,d} to height {:,d}' .format(count, start, start + count - 1)) - return self.fs_cache.block_hashes(start, count) + return self.block_hashes(start, count) def clean_db(self): '''Clean out stale DB items. @@ -534,7 +536,9 @@ class BlockProcessor(server.db.DB): if self.height > self.db_height: assert flush_history is None flush_history = self.flush_history - self.fs_cache.flush(self.height, self.tx_count) + self.fs_flush() + self.logger.info('FS flush took {:.1f} seconds' + .format(time.time() - flush_start)) with self.db.write_batch() as batch: # History first - fast and frees memory. Flush state last @@ -593,6 +597,55 @@ class BlockProcessor(server.db.DB): self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 + def fs_flush(self): + '''Flush the things stored on the filesystem. + The arguments are passed for sanity check assertions only.''' + blocks_done = len(self.headers) + prior_tx_count = (self.tx_counts[self.fs_height] + if self.fs_height >= 0 else 0) + cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 + txs_done = cur_tx_count - prior_tx_count + + assert self.fs_height + blocks_done == self.height + assert len(self.tx_hashes) == blocks_done + assert len(self.tx_counts) == self.height + 1 + assert cur_tx_count == self.tx_count, \ + 'cur: {:,d} new: {:,d}'.format(cur_tx_count, self.tx_count) + + # First the headers + headers = b''.join(self.headers) + header_len = self.coin.HEADER_LEN + self.headers_file.seek((self.fs_height + 1) * header_len) + self.headers_file.write(headers) + self.headers_file.flush() + + # Then the tx counts + self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize) + self.txcount_file.write(self.tx_counts[self.fs_height + 1:]) + self.txcount_file.flush() + + # Finally the hashes + hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes))) + assert len(hashes) % 32 == 0 + assert len(hashes) // 32 == txs_done + cursor = 0 + file_pos = prior_tx_count * 32 + while cursor < len(hashes): + file_num, offset = divmod(file_pos, self.tx_hash_file_size) + size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) + filename = 'hashes{:04d}'.format(file_num) + with self.open_file(filename, create=True) as f: + f.seek(offset) + f.write(hashes[cursor:cursor + size]) + cursor += size + file_pos += size + + os.sync() + + self.tx_hashes = [] + self.headers = [] + self.fs_height += blocks_done + def backup_history(self, batch, hash168s): self.logger.info('backing up history to height {:,d} tx_count {:,d}' .format(self.height, self.tx_count)) @@ -662,9 +715,18 @@ class BlockProcessor(server.db.DB): '''Read undo information from a file for the current height.''' return self.db.get(self.undo_key(height)) + def fs_advance_block(self, header, tx_hashes, txs): + '''Update unflushed FS state for a new block.''' + prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0 + + # Cache the new header, tx hashes and cumulative tx count + self.headers.append(header) + self.tx_hashes.append(tx_hashes) + self.tx_counts.append(prior_tx_count + len(txs)) + def advance_block(self, block, update_touched): - # We must update the fs_cache before calling advance_txs() as - # the UTXO cache uses the fs_cache via get_tx_hash() to + # We must update the FS cache before calling advance_txs() as + # the UTXO cache uses the FS cache via get_tx_hash() to # resolve compressed key collisions header, tx_hashes, txs = self.coin.read_block(block) prev_hash, header_hash = self.coin.header_hashes(header) @@ -672,7 +734,7 @@ class BlockProcessor(server.db.DB): raise ChainReorg touched = set() - self.fs_cache.advance_block(header, tx_hashes, txs) + self.fs_advance_block(header, tx_hashes, txs) self.tip = header_hash self.height += 1 undo_info = self.advance_txs(tx_hashes, txs, touched) @@ -733,6 +795,16 @@ class BlockProcessor(server.db.DB): return undo_info + def fs_backup_block(self): + '''Revert a block.''' + assert not self.headers + assert not self.tx_hashes + assert self.fs_height >= 0 + # Just update in-memory. It doesn't matter if disk files are + # too long, they will be overwritten when advancing. + self.fs_height -= 1 + self.tx_counts.pop() + def backup_blocks(self, blocks): '''Backup the blocks and flush. @@ -752,7 +824,7 @@ class BlockProcessor(server.db.DB): hash_to_str(self.tip), self.height)) self.backup_txs(tx_hashes, txs, touched) - self.fs_cache.backup_block() + self.fs_backup_block() self.tip = prev_hash self.height -= 1 diff --git a/server/cache.py b/server/cache.py index f06eb49..834f5b2 100644 --- a/server/cache.py +++ b/server/cache.py @@ -12,14 +12,10 @@ Once synced flushes are performed after processing each block. ''' -import array -import itertools -import os import struct -from bisect import bisect_right -from lib.util import chunks, LoggedClass -from lib.hash import double_sha256, hash_to_str +from lib.util import LoggedClass +from lib.hash import hash_to_str # History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries @@ -259,155 +255,3 @@ class UTXOCache(LoggedClass): .format(new_utxos, self.db_deletes, hcolls, ucolls)) self.cache_spends = self.db_deletes = 0 - - -class FSCache(LoggedClass): - - def __init__(self, coin, height, tx_count): - super().__init__() - - self.coin = coin - self.tx_hash_file_size = 16 * 1024 * 1024 - assert self.tx_hash_file_size % 32 == 0 - - # On-disk values, updated by a flush - self.height = height - - # Unflushed items - self.headers = [] - self.tx_hashes = [] - - is_new = height == -1 - self.headers_file = self.open_file('headers', is_new) - self.txcount_file = self.open_file('txcount', is_new) - - # tx_counts[N] has the cumulative number of txs at the end of - # height N. So tx_counts[0] is 1 - the genesis coinbase - self.tx_counts = array.array('I') - self.txcount_file.seek(0) - self.tx_counts.fromfile(self.txcount_file, self.height + 1) - if self.tx_counts: - assert tx_count == self.tx_counts[-1] - else: - assert tx_count == 0 - - def open_file(self, filename, create=False): - '''Open the file name. Return its handle.''' - try: - return open(filename, 'rb+') - except FileNotFoundError: - if create: - return open(filename, 'wb+') - raise - - def advance_block(self, header, tx_hashes, txs): - '''Update the FS cache for a new block.''' - prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0 - - # Cache the new header, tx hashes and cumulative tx count - self.headers.append(header) - self.tx_hashes.append(tx_hashes) - self.tx_counts.append(prior_tx_count + len(txs)) - - def backup_block(self): - '''Revert a block.''' - assert not self.headers - assert not self.tx_hashes - assert self.height >= 0 - # Just update in-memory. It doesn't matter if disk files are - # too long, they will be overwritten when advancing. - self.height -= 1 - self.tx_counts.pop() - - def flush(self, new_height, new_tx_count): - '''Flush the things stored on the filesystem. - The arguments are passed for sanity check assertions only.''' - self.logger.info('flushing to file system') - - blocks_done = len(self.headers) - prior_tx_count = self.tx_counts[self.height] if self.height >= 0 else 0 - cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 - txs_done = cur_tx_count - prior_tx_count - - assert self.height + blocks_done == new_height - assert len(self.tx_hashes) == blocks_done - assert len(self.tx_counts) == new_height + 1 - assert cur_tx_count == new_tx_count, \ - 'cur: {:,d} new: {:,d}'.format(cur_tx_count, new_tx_count) - - # First the headers - headers = b''.join(self.headers) - header_len = self.coin.HEADER_LEN - self.headers_file.seek((self.height + 1) * header_len) - self.headers_file.write(headers) - self.headers_file.flush() - - # Then the tx counts - self.txcount_file.seek((self.height + 1) * self.tx_counts.itemsize) - self.txcount_file.write(self.tx_counts[self.height + 1:]) - self.txcount_file.flush() - - # Finally the hashes - hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes))) - assert len(hashes) % 32 == 0 - assert len(hashes) // 32 == txs_done - cursor = 0 - file_pos = prior_tx_count * 32 - while cursor < len(hashes): - file_num, offset = divmod(file_pos, self.tx_hash_file_size) - size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) - filename = 'hashes{:04d}'.format(file_num) - with self.open_file(filename, create=True) as f: - f.seek(offset) - f.write(hashes[cursor:cursor + size]) - cursor += size - file_pos += size - - os.sync() - - self.tx_hashes = [] - self.headers = [] - self.height += blocks_done - - def read_headers(self, start, count): - result = b'' - - # Read some from disk - disk_count = min(count, self.height + 1 - start) - if disk_count > 0: - header_len = self.coin.HEADER_LEN - assert start >= 0 - self.headers_file.seek(start * header_len) - result = self.headers_file.read(disk_count * header_len) - count -= disk_count - start += disk_count - - # The rest from memory - start -= self.height + 1 - assert count >= 0 and start + count <= len(self.headers) - result += b''.join(self.headers[start: start + count]) - - return result - - def get_tx_hash(self, tx_num): - '''Returns the tx_hash and height of a tx number.''' - height = bisect_right(self.tx_counts, tx_num) - - # Is this on disk or unflushed? - if height > self.height: - tx_hashes = self.tx_hashes[height - (self.height + 1)] - tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]] - else: - file_pos = tx_num * 32 - file_num, offset = divmod(file_pos, self.tx_hash_file_size) - filename = 'hashes{:04d}'.format(file_num) - with self.open_file(filename) as f: - f.seek(offset) - tx_hash = f.read(32) - - return tx_hash, height - - def block_hashes(self, height, count): - headers = self.read_headers(height, count) - hlen = self.coin.HEADER_LEN - return [double_sha256(header) for header in chunks(headers, hlen)] diff --git a/server/db.py b/server/db.py index 1415514..fd2d65f 100644 --- a/server/db.py +++ b/server/db.py @@ -12,10 +12,11 @@ import array import ast import os import struct +from bisect import bisect_right from collections import namedtuple -from server.cache import FSCache, NO_CACHE_ENTRY -from lib.util import LoggedClass +from lib.util import chunks, LoggedClass +from lib.hash import double_sha256 from server.storage import open_db UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") @@ -54,10 +55,29 @@ class DB(LoggedClass): self.height = self.db_height self.tip = self.db_tip - # Cache wrapping the filesystem and redirected functions - self.fs_cache = FSCache(self.coin, self.height, self.tx_count) - self.get_tx_hash = self.fs_cache.get_tx_hash - self.read_headers = self.fs_cache.read_headers + # -- FS related members -- + self.tx_hash_file_size = 16 * 1024 * 1024 + + # On-disk height updated by a flush + self.fs_height = self.height + + # Unflushed items + self.headers = [] + self.tx_hashes = [] + + create = self.height == -1 + self.headers_file = self.open_file('headers', create) + self.txcount_file = self.open_file('txcount', create) + + # tx_counts[N] has the cumulative number of txs at the end of + # height N. So tx_counts[0] is 1 - the genesis coinbase + self.tx_counts = array.array('I') + self.txcount_file.seek(0) + self.tx_counts.fromfile(self.txcount_file, self.height + 1) + if self.tx_counts: + assert self.tx_count == self.tx_counts[-1] + else: + assert self.tx_count == 0 def init_state_from_db(self): if self.db.is_new: @@ -83,6 +103,59 @@ class DB(LoggedClass): self.wall_time = state['wall_time'] self.first_sync = state.get('first_sync', True) + def open_file(self, filename, create=False): + '''Open the file name. Return its handle.''' + try: + return open(filename, 'rb+') + except FileNotFoundError: + if create: + return open(filename, 'wb+') + raise + + def read_headers(self, start, count): + result = b'' + + # Read some from disk + disk_count = min(count, self.fs_height + 1 - start) + if disk_count > 0: + header_len = self.coin.HEADER_LEN + assert start >= 0 + self.headers_file.seek(start * header_len) + result = self.headers_file.read(disk_count * header_len) + count -= disk_count + start += disk_count + + # The rest from memory + start -= self.fs_height + 1 + assert count >= 0 and start + count <= len(self.headers) + result += b''.join(self.headers[start: start + count]) + + return result + + def get_tx_hash(self, tx_num): + '''Returns the tx_hash and height of a tx number.''' + height = bisect_right(self.tx_counts, tx_num) + + # Is this on disk or unflushed? + if height > self.fs_height: + tx_hashes = self.tx_hashes[height - (self.fs_height + 1)] + tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]] + else: + file_pos = tx_num * 32 + file_num, offset = divmod(file_pos, self.tx_hash_file_size) + filename = 'hashes{:04d}'.format(file_num) + with self.open_file(filename) as f: + f.seek(offset) + tx_hash = f.read(32) + + return tx_hash, height + + def block_hashes(self, height, count): + headers = self.read_headers(height, count) + # FIXME: move to coins.py + hlen = self.coin.HEADER_LEN + return [double_sha256(header) for header in chunks(headers, hlen)] + @staticmethod def _resolve_limit(limit): if limit is None: @@ -142,9 +215,7 @@ class DB(LoggedClass): hash168 = None if 0 <= index <= 65535: idx_packed = struct.pack(' Date: Tue, 8 Nov 2016 23:16:12 +0900 Subject: [PATCH 10/17] Move the rest of fs_cache into BlockProcessor --- server/block_processor.py | 110 +++++++++++++++++++++++--------------- server/db.py | 84 +++++++++++------------------ 2 files changed, 98 insertions(+), 96 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 679bc05..a34e6cd 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -298,6 +298,11 @@ class BlockProcessor(server.db.DB): daemon and a new block arrives or the mempool is updated.''' super().__init__(env) + # These are our state as we move ahead of DB state + self.height = self.db_height + self.tip = self.db_tip + self.tx_count = self.db_tx_count + self.daemon = Daemon(env.daemon_url, env.debug) self.daemon.debug_set_height(self.height) self.mempool = MemPool(self) @@ -317,8 +322,10 @@ class BlockProcessor(server.db.DB): self.last_flush = time.time() self.last_flush_tx_count = self.tx_count - # UTXO cache + # Caches of unflushed items self.utxo_cache = UTXOCache(self.get_tx_hash, self.db, self.coin) + self.headers = [] + self.tx_hashes = [] # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' @@ -413,7 +420,7 @@ class BlockProcessor(server.db.DB): start = self.height - 1 count = 1 while start > 0: - hashes = self.block_hashes(start, count) + hashes = self.fs_block_hashes(start, count) hex_hashes = [hash_to_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) n = match_pos(hex_hashes, d_hex_hashes) @@ -430,7 +437,7 @@ class BlockProcessor(server.db.DB): 'height {:,d} to height {:,d}' .format(count, start, start + count - 1)) - return self.block_hashes(start, count) + return self.fs_block_hashes(start, count) def clean_db(self): '''Clean out stale DB items. @@ -504,9 +511,6 @@ class BlockProcessor(server.db.DB): self.height - self.db_height)) self.utxo_cache.flush(batch) self.utxo_flush_count = self.flush_count - self.db_tx_count = self.tx_count - self.db_height = self.height - self.db_tip = self.tip def assert_flushed(self): '''Asserts state is fully flushed.''' @@ -524,39 +528,40 @@ class BlockProcessor(server.db.DB): self.assert_flushed() return + self.flush_count += 1 flush_start = time.time() last_flush = self.last_flush tx_diff = self.tx_count - self.last_flush_tx_count show_stats = self.first_sync - # Write out the files to the FS before flushing to the DB. If - # the DB transaction fails, the files being too long doesn't - # matter. But if writing the files fails we do not want to - # have updated the DB. if self.height > self.db_height: assert flush_history is None flush_history = self.flush_history - self.fs_flush() - self.logger.info('FS flush took {:.1f} seconds' - .format(time.time() - flush_start)) with self.db.write_batch() as batch: # History first - fast and frees memory. Flush state last # as it reads the wall time. flush_history(batch) if flush_utxos: + self.fs_flush() self.flush_utxos(batch) self.flush_state(batch) self.logger.info('committing transaction...') + # Update our in-memory state after successful flush + self.db_tx_count = self.tx_count + self.db_height = self.height + self.db_tip = self.tip + self.tx_hashes = [] + self.headers = [] + # Update and put the wall time again - otherwise we drop the # time it took to commit the batch self.flush_state(self.db) - flush_time = int(self.last_flush - flush_start) self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} took {:,d}s' .format(self.flush_count, self.height, self.tx_count, - flush_time)) + int(self.last_flush - flush_start))) # Catch-up stats if show_stats: @@ -582,31 +587,30 @@ class BlockProcessor(server.db.DB): formatted_time(tx_est / this_tx_per_sec))) def flush_history(self, batch): - self.logger.info('flushing history') - - self.flush_count += 1 + flush_start = time.time() flush_id = struct.pack('>H', self.flush_count) for hash168, hist in self.history.items(): key = b'H' + hash168 + flush_id batch.put(key, hist.tobytes()) - self.logger.info('{:,d} history entries in {:,d} addrs' - .format(self.history_size, len(self.history))) - + self.logger.info('flushed {:,d} history entries for {:,d} addrs ' + 'in {:,d}s' + .format(self.history_size, len(self.history), + int(time.time() - flush_start))) self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 def fs_flush(self): - '''Flush the things stored on the filesystem. - The arguments are passed for sanity check assertions only.''' + '''Flush the things stored on the filesystem.''' + flush_start = time.time() blocks_done = len(self.headers) - prior_tx_count = (self.tx_counts[self.fs_height] - if self.fs_height >= 0 else 0) + prior_tx_count = (self.tx_counts[self.db_height] + if self.db_height >= 0 else 0) cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 txs_done = cur_tx_count - prior_tx_count - assert self.fs_height + blocks_done == self.height + assert self.db_height + blocks_done == self.height assert len(self.tx_hashes) == blocks_done assert len(self.tx_counts) == self.height + 1 assert cur_tx_count == self.tx_count, \ @@ -615,13 +619,13 @@ class BlockProcessor(server.db.DB): # First the headers headers = b''.join(self.headers) header_len = self.coin.HEADER_LEN - self.headers_file.seek((self.fs_height + 1) * header_len) + self.headers_file.seek((self.db_height + 1) * header_len) self.headers_file.write(headers) self.headers_file.flush() # Then the tx counts - self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize) - self.txcount_file.write(self.tx_counts[self.fs_height + 1:]) + self.txcount_file.seek((self.db_height + 1) * self.tx_counts.itemsize) + self.txcount_file.write(self.tx_counts[self.db_height + 1:]) self.txcount_file.flush() # Finally the hashes @@ -642,9 +646,8 @@ class BlockProcessor(server.db.DB): os.sync() - self.tx_hashes = [] - self.headers = [] - self.fs_height += blocks_done + self.logger.info('FS flush took {:.1f} seconds' + .format(time.time() - flush_start)) def backup_history(self, batch, hash168s): self.logger.info('backing up history to height {:,d} tx_count {:,d}' @@ -795,16 +798,6 @@ class BlockProcessor(server.db.DB): return undo_info - def fs_backup_block(self): - '''Revert a block.''' - assert not self.headers - assert not self.tx_hashes - assert self.fs_height >= 0 - # Just update in-memory. It doesn't matter if disk files are - # too long, they will be overwritten when advancing. - self.fs_height -= 1 - self.tx_counts.pop() - def backup_blocks(self, blocks): '''Backup the blocks and flush. @@ -824,10 +817,13 @@ class BlockProcessor(server.db.DB): hash_to_str(self.tip), self.height)) self.backup_txs(tx_hashes, txs, touched) - self.fs_backup_block() self.tip = prev_hash + assert self.height >= 0 self.height -= 1 + assert not self.headers + assert not self.tx_hashes + self.logger.info('backed up to height {:,d}'.format(self.height)) self.touched.update(touched) @@ -866,6 +862,34 @@ class BlockProcessor(server.db.DB): assert n == 0 self.tx_count -= len(txs) + def read_headers(self, start, count): + # Read some from disk + disk_count = min(count, self.db_height + 1 - start) + result = self.fs_read_headers(start, disk_count) + count -= disk_count + start += disk_count + + # The rest from memory + if count: + start -= self.db_height + 1 + if not (count >= 0 and start + count <= len(self.headers)): + raise ChainError('{:,d} headers starting at {:,d} not on disk' + .format(count, start)) + result += b''.join(self.headers[start: start + count]) + + return result + + def get_tx_hash(self, tx_num): + '''Returns the tx_hash and height of a tx number.''' + tx_hash, tx_height = self.fs_tx_hash(tx_num) + + # Is this unflushed? + if tx_hash is None: + tx_hashes = self.tx_hashes[tx_height - (self.db_height + 1)] + tx_hash = tx_hashes[tx_num - self.tx_counts[tx_height - 1]] + + return tx_hash, tx_height + def mempool_transactions(self, hash168): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool entries for the hash168. diff --git a/server/db.py b/server/db.py index fd2d65f..414eb75 100644 --- a/server/db.py +++ b/server/db.py @@ -49,35 +49,22 @@ class DB(LoggedClass): else: self.logger.info('successfully opened {} database {}' .format(env.db_engine, db_name)) - self.init_state_from_db() - self.tx_count = self.db_tx_count - self.height = self.db_height - self.tip = self.db_tip - # -- FS related members -- - self.tx_hash_file_size = 16 * 1024 * 1024 - - # On-disk height updated by a flush - self.fs_height = self.height - - # Unflushed items - self.headers = [] - self.tx_hashes = [] - - create = self.height == -1 + create = self.db_height == -1 self.headers_file = self.open_file('headers', create) self.txcount_file = self.open_file('txcount', create) + self.tx_hash_file_size = 16 * 1024 * 1024 # tx_counts[N] has the cumulative number of txs at the end of # height N. So tx_counts[0] is 1 - the genesis coinbase self.tx_counts = array.array('I') self.txcount_file.seek(0) - self.tx_counts.fromfile(self.txcount_file, self.height + 1) + self.tx_counts.fromfile(self.txcount_file, self.db_height + 1) if self.tx_counts: - assert self.tx_count == self.tx_counts[-1] + assert self.db_tx_count == self.tx_counts[-1] else: - assert self.tx_count == 0 + assert self.db_tx_count == 0 def init_state_from_db(self): if self.db.is_new: @@ -112,46 +99,37 @@ class DB(LoggedClass): return open(filename, 'wb+') raise - def read_headers(self, start, count): - result = b'' - + def fs_read_headers(self, start, count): # Read some from disk - disk_count = min(count, self.fs_height + 1 - start) - if disk_count > 0: + disk_count = min(count, self.db_height + 1 - start) + if start < 0 or count < 0 or disk_count != count: + raise self.DBError('{:,d} headers starting at {:,d} not on disk' + .format(count, start)) + if disk_count: header_len = self.coin.HEADER_LEN - assert start >= 0 self.headers_file.seek(start * header_len) - result = self.headers_file.read(disk_count * header_len) - count -= disk_count - start += disk_count + return self.headers_file.read(disk_count * header_len) + return b'' - # The rest from memory - start -= self.fs_height + 1 - assert count >= 0 and start + count <= len(self.headers) - result += b''.join(self.headers[start: start + count]) + def fs_tx_hash(self, tx_num): + '''Return a par (tx_hash, tx_height) for the given tx number. - return result + If the tx_height is not on disk, returns (None, tx_height).''' + tx_height = bisect_right(self.tx_counts, tx_num) - def get_tx_hash(self, tx_num): - '''Returns the tx_hash and height of a tx number.''' - height = bisect_right(self.tx_counts, tx_num) + if tx_height > self.db_height: + return None, tx_height + raise self.DBError('tx_num {:,d} is not on disk') - # Is this on disk or unflushed? - if height > self.fs_height: - tx_hashes = self.tx_hashes[height - (self.fs_height + 1)] - tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]] - else: - file_pos = tx_num * 32 - file_num, offset = divmod(file_pos, self.tx_hash_file_size) - filename = 'hashes{:04d}'.format(file_num) - with self.open_file(filename) as f: - f.seek(offset) - tx_hash = f.read(32) + file_pos = tx_num * 32 + file_num, offset = divmod(file_pos, self.tx_hash_file_size) + filename = 'hashes{:04d}'.format(file_num) + with self.open_file(filename) as f: + f.seek(offset) + return f.read(32), tx_height - return tx_hash, height - - def block_hashes(self, height, count): - headers = self.read_headers(height, count) + def fs_block_hashes(self, height, count): + headers = self.fs_read_headers(height, count) # FIXME: move to coins.py hlen = self.coin.HEADER_LEN return [double_sha256(header) for header in chunks(headers, hlen)] @@ -178,7 +156,7 @@ class DB(LoggedClass): for tx_num in a: if limit == 0: return - yield self.get_tx_hash(tx_num) + yield self.fs_tx_hash(tx_num) limit -= 1 def get_balance(self, hash168): @@ -201,7 +179,7 @@ class DB(LoggedClass): return (tx_num,) = unpack(' Date: Tue, 8 Nov 2016 23:37:19 +0900 Subject: [PATCH 11/17] Better logging of initial mempool fill --- server/block_processor.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index a34e6cd..c995c69 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -180,8 +180,9 @@ class MemPool(LoggedClass): hex_hashes = set(hex_hashes) touched = set() - if self.count < 0: - self.logger.info('initial fetch of {:,d} daemon mempool txs' + initial = self.count < 0 + if initial: + self.logger.info('beginning import of {:,d} mempool txs' .format(len(hex_hashes))) # Remove gone items @@ -201,6 +202,8 @@ class MemPool(LoggedClass): # them into a dictionary of hex hash to deserialized tx. hex_hashes.difference_update(self.txs) raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes) + if initial: + self.logger.info('all fetched, now analysing...') new_txs = {hex_hash: Deserializer(raw_tx).read_tx() for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx} del raw_txs, hex_hashes @@ -232,8 +235,17 @@ class MemPool(LoggedClass): value, = struct.unpack(' next_log: + next_log = time.time() + 10 + self.logger.info('{:,d} done ({:d}%)' + .format(n, int(n / len(new_txs) * 100))) txout_pairs = self.txs[hex_hash][1] try: infos = (txin_info(txin) for txin in tx.inputs) From ed44c6ab73d497a937bdcf0731339f12e014d7f4 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 9 Nov 2016 00:47:49 +0900 Subject: [PATCH 12/17] Clean up startup procedure --- docs/ARCHITECTURE.rst | 12 ++++++------ electrumx_server.py | 28 ++++++++-------------------- server/block_processor.py | 11 +++-------- 3 files changed, 17 insertions(+), 34 deletions(-) diff --git a/docs/ARCHITECTURE.rst b/docs/ARCHITECTURE.rst index 3bd8f5e..98c43c4 100644 --- a/docs/ARCHITECTURE.rst +++ b/docs/ARCHITECTURE.rst @@ -15,13 +15,13 @@ The components of the server are roughly like this:: - ElectrumX -<<<<<- LocalRPC - ------------- ------------ < > - ---------- ------------------- ---------- - - Daemon -<<<<<<<<- Block processor ->>>>- Caches - - ---------- ------------------- ---------- + ---------- ------------------- -------------- + - Daemon -<<<<<<<<- Block processor ->>>>- UTXO Cache - + ---------- ------------------- -------------- < < > < - -------------- ----------- - - Prefetcher - - Storage - - -------------- ----------- + -------------- ---------------- + - Prefetcher - - FS + Storage - + -------------- ---------------- Env diff --git a/electrumx_server.py b/electrumx_server.py index 6a817f8..939bd2e 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -20,20 +20,6 @@ from server.env import Env from server.protocol import BlockServer -def close_loop(loop): - '''Close the loop down cleanly. Cancel and collect remaining tasks.''' - tasks = asyncio.Task.all_tasks() - for task in tasks: - task.cancel() - - try: - loop.run_until_complete(asyncio.gather(*tasks)) - except asyncio.CancelledError: - pass - - loop.close() - - def main_loop(): '''Start the server.''' if os.geteuid() == 0: @@ -45,9 +31,9 @@ def main_loop(): def on_signal(signame): '''Call on receipt of a signal to cleanly shutdown.''' - logging.warning('received {} signal, preparing to shut down' - .format(signame)) - loop.stop() + logging.warning('received {} signal, shutting down'.format(signame)) + for task in asyncio.Task.all_tasks(): + task.cancel() # Install signal handlers for signame in ('SIGINT', 'SIGTERM'): @@ -55,12 +41,14 @@ def main_loop(): partial(on_signal, signame)) server = BlockServer(Env()) - server.start() + future = server.start() try: - loop.run_forever() + loop.run_until_complete(future) + except asyncio.CancelledError: + pass finally: server.stop() - close_loop(loop) + loop.close() def main(): diff --git a/server/block_processor.py b/server/block_processor.py index c995c69..b44a33e 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -77,10 +77,6 @@ class Prefetcher(LoggedClass): else: return blocks, None - def start(self): - '''Start the prefetcher.''' - asyncio.ensure_future(self.main_loop()) - async def main_loop(self): '''Loop forever polling for more blocks.''' self.logger.info('starting daemon poll loop...') @@ -179,7 +175,6 @@ class MemPool(LoggedClass): ''' hex_hashes = set(hex_hashes) touched = set() - initial = self.count < 0 if initial: self.logger.info('beginning import of {:,d} mempool txs' @@ -357,9 +352,9 @@ class BlockProcessor(server.db.DB): self.clean_db() def start(self): - '''Start the block processor.''' - asyncio.ensure_future(self.main_loop()) - self.prefetcher.start() + '''Returns a future that starts the block processor when awaited.''' + return asyncio.gather(self.main_loop(), + self.prefetcher.main_loop()) async def main_loop(self): '''Main loop for block processing. From c6a57daf6a75c863f15ddcfaf3addc9dcad57779 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 9 Nov 2016 06:32:26 +0900 Subject: [PATCH 13/17] Tweak daemon messages --- server/daemon.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/daemon.py b/server/daemon.py index f785fc2..e94af41 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -86,11 +86,11 @@ class Daemon(util.LoggedClass): except aiohttp.ClientHttpProcessingError: msg = 'HTTP error' except aiohttp.ServerDisconnectedError: - msg = 'daemon disconnected' + msg = 'disconnected' except aiohttp.ClientConnectionError: msg = 'connection problem - is your daemon running?' except DaemonWarmingUpError: - msg = 'daemon is still warming up' + msg = 'still starting up checking blocks...' if msg != prior_msg or count == 10: self.logger.error('{}. Retrying between sleeps...' From 212d653b5bbe8af903a7c63c145890cfbd6e681e Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 9 Nov 2016 06:45:24 +0900 Subject: [PATCH 14/17] Yield during mempool updates --- server/block_processor.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/server/block_processor.py b/server/block_processor.py index b44a33e..f6e1bbf 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -211,7 +211,10 @@ class MemPool(LoggedClass): def txout_pair(txout): return (script_hash168(txout.pk_script), txout.value) - for hex_hash, tx in new_txs.items(): + for n, (hex_hash, tx) in enumerate(new_txs.items()): + # Yield to process e.g. signals + if n % 500 == 0: + await asyncio.sleep(0) txout_pairs = [txout_pair(txout) for txout in tx.outputs] self.txs[hex_hash] = (None, txout_pairs, None) @@ -237,10 +240,15 @@ class MemPool(LoggedClass): # Now add the inputs for n, (hex_hash, tx) in enumerate(new_txs.items()): + # Yield to process e.g. signals + if n % 100 == 0: + await asyncio.sleep(0) + if initial and time.time() > next_log: next_log = time.time() + 10 self.logger.info('{:,d} done ({:d}%)' .format(n, int(n / len(new_txs) * 100))) + txout_pairs = self.txs[hex_hash][1] try: infos = (txin_info(txin) for txin in tx.inputs) From 55cc1d3d65a404185acde95447d9c8e09c24f6c5 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 9 Nov 2016 06:50:00 +0900 Subject: [PATCH 15/17] More regularly --- server/block_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/block_processor.py b/server/block_processor.py index f6e1bbf..52b5b7a 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -241,7 +241,7 @@ class MemPool(LoggedClass): # Now add the inputs for n, (hex_hash, tx) in enumerate(new_txs.items()): # Yield to process e.g. signals - if n % 100 == 0: + if n % 50 == 0: await asyncio.sleep(0) if initial and time.time() > next_log: From a07ed5876b5a0fc49ddc189a279cd52c0b919c33 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 9 Nov 2016 07:29:45 +0900 Subject: [PATCH 16/17] Count missing UTXOs and log just once They're a temporary phenomenon but it's handy to see how often these things happen. Fixes #10. --- server/block_processor.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 52b5b7a..feb4eee 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -175,6 +175,8 @@ class MemPool(LoggedClass): ''' hex_hashes = set(hex_hashes) touched = set() + missing_utxos = 0 + initial = self.count < 0 if initial: self.logger.info('beginning import of {:,d} mempool txs' @@ -225,10 +227,8 @@ class MemPool(LoggedClass): return mempool_entry[1][txin.prev_idx], True entry = utxo_lookup(txin.prev_hash, txin.prev_idx) if entry == NO_CACHE_ENTRY: - # Not possible unless daemon is lying or we're corrupted? - self.logger.warning('no UTXO found for {} / {}' - .format(hash_to_str(txin.prev_hash), - txin.prev_idx)) + # This happens when the daemon is a block ahead of us + # and has mempool txs spending new txs in that block raise MissingUTXOError value, = struct.unpack(' Date: Wed, 9 Nov 2016 07:44:17 +0900 Subject: [PATCH 17/17] Prepare 0.2.2 --- docs/RELEASE-NOTES | 11 +++++++++++ server/version.py | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index bf1214c..cf34aed 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -1,3 +1,14 @@ +Version 0.2.2 +------------- + +- mostly refactoring: controller.py is gone; cache.py is half-gone. + Split BlockProcessor into 3: DB, BlockProcessor and BlockServer. DB + handles stored DB and FS state; BlockProcessor handles pushing the + chain forward and caching of updates, and BlockServer will + additionally serve clients on catchup. More to come. +- mempool: better logging; also yields during initial seeding +- issues fixed: #10 + Version 0.2.1 ------------- diff --git a/server/version.py b/server/version.py index 7dd5120..79efc29 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.2.1" +VERSION = "ElectrumX 0.2.2"