From ccf24fdc7113c4393511e7d1c080a6af26f47686 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 19 Nov 2016 17:13:52 +0900 Subject: [PATCH 1/9] Wait 20 secs for sockets to close 10 seems a little low --- server/protocol.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index 07c4073..f7abb20 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -326,17 +326,17 @@ class ServerManager(LoggedClass): await asyncio.sleep(0) self.servers = [] self.logger.info('server listening sockets closed') - limit = time.time() + 10 + limit = time.time() + 15 while self.sessions and time.time() < limit: self.logger.info('{:,d} sessions remaining' .format(len(self.sessions))) - await asyncio.sleep(2) + await asyncio.sleep(3) if self.sessions: self.logger.info('forcibly closing {:,d} stragglers' .format(len(self.sessions))) for future in self.sessions.values(): future.cancel() - await asyncio.sleep(0) + await asyncio.sleep(1) def add_session(self, session): assert self.servers From 8617c82ec22c8e991ad3e3936577ffee00df14da Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 19 Nov 2016 20:54:40 +0900 Subject: [PATCH 2/9] Floor disk_count at zero. --- server/block_processor.py | 2 +- server/db.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/block_processor.py b/server/block_processor.py index 9373bc9..1765878 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -828,7 +828,7 @@ class BlockProcessor(server.db.DB): def read_headers(self, start, count): # Read some from disk - disk_count = min(count, self.fs_height + 1 - start) + disk_count = min(count, max(0, self.fs_height + 1 - start)) result = self.fs_read_headers(start, disk_count) count -= disk_count start += disk_count diff --git a/server/db.py b/server/db.py index c1b3ff9..aa5e129 100644 --- a/server/db.py +++ b/server/db.py @@ -143,6 +143,7 @@ class DB(LoggedClass): raise def fs_read_headers(self, start, count): + '''Requires count >= 0.''' # Read some from disk disk_count = min(count, self.db_height + 1 - start) if start < 0 or count < 0 or disk_count != count: From dd5a31d0f4a86c61ba251817373c5cdb63228a3b Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 19 Nov 2016 20:55:48 +0900 Subject: [PATCH 3/9] Take a little more care cleaning up connections --- server/protocol.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index f7abb20..f75a08a 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -316,33 +316,36 @@ class ServerManager(LoggedClass): if self.irc_future: self.irc_future.cancel() for session in self.sessions: - session.transport.close() + self.close_session(session) async def wait_shutdown(self): # Wait for servers to close for server in self.servers: await server.wait_closed() - # Just in case a connection came in - await asyncio.sleep(0) self.servers = [] - self.logger.info('server listening sockets closed') - limit = time.time() + 15 + + secs = 60 + self.logger.info('server listening sockets closed, waiting ' + '{:d} seconds for socket cleanup'.format(secs)) + + limit = time.time() + secs while self.sessions and time.time() < limit: + await asyncio.sleep(4) self.logger.info('{:,d} sessions remaining' .format(len(self.sessions))) - await asyncio.sleep(3) - if self.sessions: - self.logger.info('forcibly closing {:,d} stragglers' - .format(len(self.sessions))) - for future in self.sessions.values(): - future.cancel() - await asyncio.sleep(1) def add_session(self, session): - assert self.servers - assert session not in self.sessions coro = session.serve_requests() - self.sessions[session] = asyncio.ensure_future(coro) + future = asyncio.ensure_future(coro) + self.sessions[session] = future + # Some connections are acknowledged after the servers are closed + if not self.servers: + self.close_session(session) + + def close_session(self, session): + '''Close the session's transport and cancel its future.''' + session.transport.close() + self.sessions[session].cancel() def remove_session(self, session): self.subscription_count -= session.sub_count() From 8b34d1c134fc049e4e3e7ee8b9ac1b94a4a9edd2 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 20 Nov 2016 18:09:32 +0900 Subject: [PATCH 4/9] Log IRC errors, typically missing package --- server/irc.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/irc.py b/server/irc.py index 0a39dd8..3a938cd 100644 --- a/server/irc.py +++ b/server/irc.py @@ -55,6 +55,8 @@ class IRC(LoggedClass): await self.join() except asyncio.CancelledError: pass + except Exception as e: + self.logger.error(str(e)) async def join(self): import irc.client as irc_client From 98fd178c62ae63e6b12bfd3c1fd42a78b19bf81c Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 20 Nov 2016 18:13:30 +0900 Subject: [PATCH 5/9] Make mempool truly asynchronous Mempool updates happen in parallel asynchronously to processing of new blocks once caught up. This means that, e.g., during the initial slow mempool download incoming blocks can be processed and communicated to clients without waiting for the downloaded mempool transaction analysis to complete. From a client's point of view the server won't be seen as lagging. --- server/block_processor.py | 97 +++++++++++++++++++-------------------- server/protocol.py | 43 +++++++++++------ 2 files changed, 77 insertions(+), 63 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 1765878..4e721f6 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -43,8 +43,8 @@ class Prefetcher(LoggedClass): self.semaphore = asyncio.Semaphore() self.queue = asyncio.Queue() self.queue_size = 0 + self.caught_up = False self.fetched_height = height - self.mempool_hashes = [] # Target cache size. Has little effect on sync time. self.target_cache_size = 10 * 1024 * 1024 # First fetch to be 10 blocks @@ -64,13 +64,14 @@ class Prefetcher(LoggedClass): self.fetched_height = height async def get_blocks(self): - '''Returns a list of prefetched blocks and the mempool.''' - blocks, height, size = await self.queue.get() + '''Blocking function that returns prefetched blocks. + + The returned result empty just once - when the prefetcher + has caught up with the daemon. + ''' + blocks, size = await self.queue.get() self.queue_size -= size - if height == self.daemon.cached_height(): - return blocks, self.mempool_hashes - else: - return blocks, None + return blocks async def main_loop(self): '''Loop forever polling for more blocks.''' @@ -78,39 +79,19 @@ class Prefetcher(LoggedClass): .format(await self.daemon.height())) while True: try: - if await self._caught_up(): - await asyncio.sleep(5) - else: - await asyncio.sleep(0) + with await self.semaphore: + await self._prefetch() + await asyncio.sleep(5 if self.caught_up else 0) except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) except asyncio.CancelledError: break - async def _caught_up(self): - '''Poll for new blocks and mempool state. - - Mempool is only queried if caught up with daemon.''' - with await self.semaphore: - blocks, size = await self._prefetch() - self.fetched_height += len(blocks) - caught_up = self.fetched_height == self.daemon.cached_height() - if caught_up: - self.mempool_hashes = await self.daemon.mempool_hashes() - - # Wake up block processor if we have something - if blocks or caught_up: - self.queue.put_nowait((blocks, self.fetched_height, size)) - self.queue_size += size - - return caught_up - async def _prefetch(self): '''Prefetch blocks unless the prefetch queue is full.''' if self.queue_size >= self.target_cache_size: - return [], 0 + return - caught_up = self.daemon.cached_height() == self.fetched_height daemon_height = await self.daemon.height() cache_room = self.target_cache_size // self.ave_size @@ -119,15 +100,18 @@ class Prefetcher(LoggedClass): count = min(daemon_height - self.fetched_height, cache_room) count = min(4000, max(count, 0)) if not count: - return [], 0 + # Indicate when we have caught up for the first time only + if not self.caught_up: + self.caught_up = True + self.queue.put_nowait(([], 0)) + return first = self.fetched_height + 1 hex_hashes = await self.daemon.block_hex_hashes(first, count) - if caught_up: + if self.caught_up: self.logger.info('new block height {:,d} hash {}' .format(first + count - 1, hex_hashes[-1])) blocks = await self.daemon.raw_blocks(hex_hashes) - size = sum(len(block) for block in blocks) # Update our recent average block size estimate @@ -136,7 +120,9 @@ class Prefetcher(LoggedClass): else: self.ave_size = (size + (10 - count) * self.ave_size) // 10 - return blocks, size + self.fetched_height += len(blocks) + self.queue.put_nowait((blocks, size)) + self.queue_size += size class ChainReorg(Exception): @@ -162,6 +148,7 @@ class BlockProcessor(server.db.DB): self.daemon = Daemon(env.daemon_url, env.debug) self.daemon.debug_set_height(self.height) + self.caught_up = False self.touched = set() self.futures = [] @@ -223,41 +210,51 @@ class BlockProcessor(server.db.DB): await asyncio.sleep(0) async def _wait_for_update(self): - '''Wait for the prefetcher to deliver blocks or a mempool update. + '''Wait for the prefetcher to deliver blocks. - Blocks are only processed in the forward direction. The - prefetcher only provides a non-None mempool when caught up. + Blocks are only processed in the forward direction. ''' - blocks, mempool_hashes = await self.prefetcher.get_blocks() + blocks = await self.prefetcher.get_blocks() + if not blocks: + await self.first_caught_up() + return '''Strip the unspendable genesis coinbase.''' if self.height == -1: blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) - caught_up = mempool_hashes is not None try: for block in blocks: - self.advance_block(block, caught_up) - if not caught_up and time.time() > self.next_cache_check: - self.check_cache_size() - self.next_cache_check = time.time() + 60 + self.advance_block(block, self.caught_up) await asyncio.sleep(0) # Yield - if caught_up: - await self.caught_up(mempool_hashes) - self.touched = set() except ChainReorg: await self.handle_chain_reorg() - async def caught_up(self, mempool_hashes): + if self.caught_up: + # Flush everything as queries are performed on the DB and + # not in-memory. + self.flush(True) + self.notify(self.touched) + elif time.time() > self.next_cache_check: + self.check_cache_size() + self.next_cache_check = time.time() + 60 + self.touched = set() + + async def first_caught_up(self): '''Called after each deamon poll if caught up.''' - # Caught up to daemon height. Flush everything as queries - # are performed on the DB and not in-memory. + self.caught_up = True if self.first_sync: self.first_sync = False self.logger.info('{} synced to height {:,d}. DB version:' .format(VERSION, self.height, self.db_version)) self.flush(True) + def notify(self, touched): + '''Called with list of touched addresses by new blocks. + + Only called for blocks found after first_caught_up is called. + Intended to be overridden in derived classes.''' + async def handle_chain_reorg(self): # First get all state on disk self.logger.info('chain reorg detected') diff --git a/server/protocol.py b/server/protocol.py index f75a08a..08764c9 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -38,15 +38,16 @@ class BlockServer(BlockProcessor): super().__init__(env) self.server_mgr = ServerManager(self, env) self.mempool = MemPool(self) - self.caught_up_yet = False - async def caught_up(self, mempool_hashes): - # Call the base class to flush before doing anything else. - await super().caught_up(mempool_hashes) - if not self.caught_up_yet: - await self.server_mgr.start_servers() - self.caught_up_yet = True - self.touched.update(await self.mempool.update(mempool_hashes)) + async def first_caught_up(self): + # Call the base class to flush and log first + await super().first_caught_up() + await self.server_mgr.start_servers() + self.futures.append(self.mempool.start()) + + def notify(self, touched): + '''Called when addresses are touched by new blocks or mempool + updates.''' self.server_mgr.notify(self.height, self.touched) def on_cancel(self): @@ -97,13 +98,29 @@ class MemPool(LoggedClass): self.bp = bp self.count = -1 - async def update(self, hex_hashes): + def start(self): + '''Starts the mempool synchronization mainloop. Return a future.''' + return asyncio.ensure_future(self.main_loop()) + + async def main_loop(self): + '''Asynchronously maintain mempool status with daemon.''' + self.logger.info('maintaining state with daemon...') + while True: + try: + await self.update() + await asyncio.sleep(5) + except DaemonError as e: + self.logger.info('ignoring daemon error: {}'.format(e)) + except asyncio.CancelledError: + break + + async def update(self): '''Update state given the current mempool to the passed set of hashes. Remove transactions that are no longer in our mempool. Request new transactions we don't have then add to our mempool. ''' - hex_hashes = set(hex_hashes) + hex_hashes = set(await self.bp.daemon.mempool_hashes()) touched = set() missing_utxos = [] @@ -210,8 +227,7 @@ class MemPool(LoggedClass): self.logger.info('{:,d} txs touching {:,d} addresses' .format(len(self.txs), len(self.hash168s))) - # Might include a None - return touched + self.bp.notify(touched) def transactions(self, hash168): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool @@ -310,11 +326,12 @@ class ServerManager(LoggedClass): def stop(self): '''Close listening servers.''' - self.logger.info('cleanly closing client sessions, please wait...') for server in self.servers: server.close() if self.irc_future: self.irc_future.cancel() + if self.sessions: + self.logger.info('cleanly closing client sessions, please wait...') for session in self.sessions: self.close_session(session) From 5eb92007aecc4ec6f0bcdf4830f962881869def9 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 20 Nov 2016 18:18:30 +0900 Subject: [PATCH 6/9] Remove redundant log; IRC will emit one --- server/protocol.py | 1 - 1 file changed, 1 deletion(-) diff --git a/server/protocol.py b/server/protocol.py index 08764c9..e6d1004 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -311,7 +311,6 @@ class ServerManager(LoggedClass): await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc) if env.irc: - self.logger.info('starting IRC coroutine') self.irc_future = asyncio.ensure_future(self.irc.start()) else: self.logger.info('IRC disabled') From 1dc43b3020b067f93b5eb0f075163f20859b6113 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 20 Nov 2016 19:08:31 +0900 Subject: [PATCH 7/9] Show connection total in connection log Fix typo resulting in no mempool notifications --- server/protocol.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index e6d1004..49f85cd 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -48,7 +48,7 @@ class BlockServer(BlockProcessor): def notify(self, touched): '''Called when addresses are touched by new blocks or mempool updates.''' - self.server_mgr.notify(self.height, self.touched) + self.server_mgr.notify(self.height, touched) def on_cancel(self): '''Called when the main loop is cancelled.''' @@ -354,6 +354,8 @@ class ServerManager(LoggedClass): coro = session.serve_requests() future = asyncio.ensure_future(coro) self.sessions[session] = future + self.logger.info('connection from {}, {:,d} total' + .format(session.peername(), len(self.sessions))) # Some connections are acknowledged after the servers are closed if not self.servers: self.close_session(session) @@ -437,7 +439,6 @@ class Session(JSONRPC): def connection_made(self, transport): '''Handle an incoming client connection.''' super().connection_made(transport) - self.logger.info('connection from {}'.format(self.peername())) self.manager.add_session(self) def connection_lost(self, exc): From 11558fd9d27b07032066865740ce0319b7ac52a8 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 20 Nov 2016 19:08:31 +0900 Subject: [PATCH 8/9] Add debug log --- server/protocol.py | 1 + 1 file changed, 1 insertion(+) diff --git a/server/protocol.py b/server/protocol.py index 49f85cd..1fa08df 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -317,6 +317,7 @@ class ServerManager(LoggedClass): def notify(self, height, touched): '''Notify sessions about height changes and touched addresses.''' + self.logger.info('{:,d} addresses touched'.format(len(touched))) cache = {} for session in self.sessions: if isinstance(session, ElectrumX): From 87cdd2709de5925272574bb7708a6c317b136351 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 20 Nov 2016 20:12:10 +0900 Subject: [PATCH 9/9] Implement daemon failover Daemon URLs can be comma-separated in the DAEMON_URL env var. Surrounding whitespace is stripped. http:// is preprended if missing. The coin's default port is supplied if missing. A trailing / is supplied if missing. Closes #33 --- README.rst | 38 ++++++++++++++++++++++++++++++-------- docs/ENV-NOTES | 10 ++++++---- docs/HOWTO.rst | 4 ++-- docs/RELEASE-NOTES | 11 +++++++++++ lib/coins.py | 19 +++++++++++++++++++ server/block_processor.py | 2 +- server/daemon.py | 28 ++++++++++++++++++++++------ server/env.py | 13 +------------ server/version.py | 2 +- 9 files changed, 93 insertions(+), 34 deletions(-) diff --git a/README.rst b/README.rst index 6c41e06..ba994c4 100644 --- a/README.rst +++ b/README.rst @@ -46,6 +46,28 @@ that could easily be reused for those alts that are reasonably compatible with Bitcoin. Such an abstraction is also useful for testnets, of course. +Features +======== + +- The full Electrum protocol is implemented with the exception of the + blockchain.address.get_proof RPC call, which is not used in normal + sessions and only sent from the Electrum command line. +- Efficient synchronization from Genesis. Recent hardware should + synchronize in well under 24 hours, possibly much faster for recent + CPUs or if you have an SSD. The fastest time to height 439k (mid + November 2016) reported is under 5 hours. Electrum-server would + probably take around 1 month. +- Subscription limiting both per-connection and across all connections. +- Minimal resource usage once caught up and serving clients; tracking the + transaction mempool seems to take the most memory. +- Each client is served asynchronously to all other clients and tasks, + so busy clients do not reduce responsiveness of other clients' + requests and notifications, or the processing of incoming blocks. +- Daemon failover. More than one daemon can be specified; ElectrumX + will failover round-robin style if the current one fails for any + reason. +- Coin abstraction makes compatible altcoin support easy. + Implementation ============== @@ -58,7 +80,7 @@ So how does it achieve a much more compact database than Electrum server, which is forced to prune hisory for busy addresses, and yet sync roughly 2 orders of magnitude faster? -I believe all of the following play a part: +I believe all of the following play a part:: - aggressive caching and batching of DB writes - more compact and efficient representation of UTXOs, address index, @@ -94,15 +116,15 @@ Roadmap Pre-1.0 - minor code cleanups - at most 1 more DB format change; I will make a weak attempt to retain 0.6 release's DB format if possible -- provision of configurable ways to limit client connections so as to - mitigate intentional or unintentional degradation of server response - time to other clients. Based on IRC discussion this will likely be a - combination of address subscription and bandwidth limits. +- provision of bandwidth limit controls +- implement simple protocol to discover peers without resorting to IRC Roadmap Post-1.0 ================ +- Python 3.6, which has several performance improvements relevant to + ElectrumX - UTXO root logic and implementation - improve DB abstraction so LMDB is not penalized - investigate effects of cache defaults and DB configuration defaults @@ -114,9 +136,9 @@ Database Format =============== The database and metadata formats of ElectrumX are likely to change. -Such changes will render old DBs unusable. At least until 1.0 I do -not intend to provide converters; moreover from-genesis sync time to -create a pristine database is quite tolerable. +Such changes will render old DBs unusable. For now I do not intend to +provide converters as the time taken from genesis to synchronize to a +pristine database is quite tolerable. Miscellany diff --git a/docs/ENV-NOTES b/docs/ENV-NOTES index fb62edd..88ec4c8 100644 --- a/docs/ENV-NOTES +++ b/docs/ENV-NOTES @@ -4,11 +4,13 @@ DB_DIRECTORY - path to the database directory (if relative, to `run` script) USERNAME - the username the server will run as if using `run` script ELECTRUMX - path to the electrumx_server.py script (if relative, to `run` script) -DAEMON_URL - the URL used to connect to the daemon. Should be of the form +DAEMON_URL - A comma-separated list of daemon URLS. If more than one is + provided ElectrumX will failover to the next when one stops + working. The generic form is: http://username:password@hostname:port/ - Alternatively you can specify DAEMON_USERNAME, DAEMON_PASSWORD, - DAEMON_HOST and DAEMON_PORT. DAEMON_PORT is optional and - will default appropriately for COIN. + The leading 'http://' is optional, as is the trailing + slash. The ':port' part is also optional and will default + to the standard RPC port for COIN if omitted. The other environment variables are all optional and will adopt sensible defaults if not specified. diff --git a/docs/HOWTO.rst b/docs/HOWTO.rst index 3f3fa5a..79fc35b 100644 --- a/docs/HOWTO.rst +++ b/docs/HOWTO.rst @@ -28,9 +28,9 @@ for someone used to either. When building the database form the genesis block, ElectrumX has to flush large quantities of data to disk and to leveldb. You will have a much nicer experience if the database directory is on an SSD than on -an HDD. Currently to around height 434,000 of the Bitcoin blockchain +an HDD. Currently to around height 439,800 of the Bitcoin blockchain the final size of the leveldb database, and other ElectrumX file -metadata comes to just over 17GB. Leveldb needs a bit more for brief +metadata comes to just over 18GB. Leveldb needs a bit more for brief periods, and the block chain is only getting longer, so I would recommend having at least 30-40GB free space. diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index 4b505c3..5de94dc 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -1,3 +1,14 @@ +version 0.7 +----------- + +- daemon failover is now supported; see docs/ENV-NOTES. As a result, + DAEMON_URL must now be supplied and DAEMON_USERNAME, DAEMON_PASSWORD, + DAEMON_HOST and DAEMON_PORT are no longer used. +- fixed a bug introduced in 0.6 series where some client header requests + would fail +- fully asynchronous mempool handling; blocks can be processed and clients + notified whilst the mempool is still being processed + version 0.6.3 ------------- diff --git a/lib/coins.py b/lib/coins.py index d872de3..f4dae22 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -14,6 +14,7 @@ necessary for appropriate handling. from decimal import Decimal from functools import partial import inspect +import re import struct import sys @@ -34,6 +35,7 @@ class Coin(object): # Not sure if these are coin-specific HEADER_LEN = 80 DEFAULT_RPC_PORT = 8332 + RPC_URL_REGEX = re.compile('.+@[^:]+(:[0-9]+)?') VALUE_PER_COIN = 100000000 CHUNK_SIZE=2016 STRANGE_VERBYTE = 0xff @@ -50,6 +52,23 @@ class Coin(object): raise CoinError('unknown coin {} and network {} combination' .format(name, net)) + @classmethod + def sanitize_url(cls, url): + # Remove surrounding ws and trailing /s + url = url.strip().rstrip('/') + match = cls.RPC_URL_REGEX.match(url) + if not match: + raise CoinError('invalid daemon URL: "{}"'.format(url)) + if match.groups()[0] is None: + url += ':{:d}'.format(cls.DEFAULT_RPC_PORT) + if not url.startswith('http://'): + url = 'http://' + url + return url + '/' + + @classmethod + def daemon_urls(cls, urls): + return [cls.sanitize_url(url) for url in urls.split(',')] + @cachedproperty def hash168_handlers(cls): return ScriptPubKey.PayToHandlers( diff --git a/server/block_processor.py b/server/block_processor.py index 4e721f6..05b0031 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -146,7 +146,7 @@ class BlockProcessor(server.db.DB): self.tip = self.db_tip self.tx_count = self.db_tx_count - self.daemon = Daemon(env.daemon_url, env.debug) + self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url), env.debug) self.daemon.debug_set_height(self.height) self.caught_up = False self.touched = set() diff --git a/server/daemon.py b/server/daemon.py index 241b85e..3c9e545 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -27,11 +27,15 @@ class Daemon(util.LoggedClass): class DaemonWarmingUpError(Exception): '''Raised when the daemon returns an error in its results.''' - def __init__(self, url, debug): + def __init__(self, urls, debug): super().__init__() - self.url = url + if not urls: + raise DaemonError('no daemon URLs provided') + for url in urls: + self.logger.info('daemon at {}'.format(self.logged_url(url))) + self.urls = urls + self.url_index = 0 self._height = None - self.logger.info('connecting at URL {}'.format(url)) self.debug_caught_up = 'caught_up' in debug # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 @@ -64,10 +68,12 @@ class Daemon(util.LoggedClass): data = json.dumps(payload) secs = 1 + max_secs = 16 while True: try: async with self.workqueue_semaphore: - async with aiohttp.post(self.url, data=data) as resp: + url = self.urls[self.url_index] + async with aiohttp.post(url, data=data) as resp: result = processor(await resp.json()) if self.prior_msg: self.logger.info('connection restored') @@ -86,8 +92,18 @@ class Daemon(util.LoggedClass): raise except Exception as e: log_error('request gave unexpected error: {}.'.format(e)) - await asyncio.sleep(secs) - secs = min(16, secs * 2) + if secs >= max_secs and len(self.urls) > 1: + self.url_index = (self.url_index + 1) % len(self.urls) + logged_url = self.logged_url(self.urls[self.url_index]) + self.logger.info('failing over to {}'.format(logged_url)) + secs = 1 + else: + await asyncio.sleep(secs) + secs = min(16, secs * 2) + + def logged_url(self, url): + '''The host and port part, for logging.''' + return url[url.rindex('@') + 1:] async def _send_single(self, method, params=None): '''Send a single request to the daemon.''' diff --git a/server/env.py b/server/env.py index 2c490f2..beb1bc2 100644 --- a/server/env.py +++ b/server/env.py @@ -30,7 +30,7 @@ class Env(LoggedClass): self.hist_MB = self.integer('HIST_MB', 300) self.host = self.default('HOST', 'localhost') self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) - self.daemon_url = self.build_daemon_url() + self.daemon_url = self.required('DAEMON_URL') # Server stuff self.tcp_port = self.integer('TCP_PORT', None) self.ssl_port = self.integer('SSL_PORT', None) @@ -74,14 +74,3 @@ class Env(LoggedClass): except: raise self.Error('cannot convert envvar {} value {} to an integer' .format(envvar, value)) - - def build_daemon_url(self): - daemon_url = environ.get('DAEMON_URL') - if not daemon_url: - username = self.required('DAEMON_USERNAME') - password = self.required('DAEMON_PASSWORD') - host = self.required('DAEMON_HOST') - port = self.default('DAEMON_PORT', self.coin.DEFAULT_RPC_PORT) - daemon_url = ('http://{}:{}@{}:{}/' - .format(username, password, host, port)) - return daemon_url diff --git a/server/version.py b/server/version.py index 5a9fd44..448eda3 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.6.3" +VERSION = "ElectrumX 0.7"