Merge branch 'develop'
This commit is contained in:
commit
20488a3b04
@ -137,10 +137,17 @@ version prior to the release of 1.0.
|
||||
ChangeLog
|
||||
=========
|
||||
|
||||
Version 0.10.6
|
||||
--------------
|
||||
|
||||
* fix for rest of second part of issue `#100`_
|
||||
* check HTTP error codes from bitcoind and log appropriately
|
||||
* don't error opening a new DB that has nothing written yet
|
||||
|
||||
Version 0.10.5
|
||||
--------------
|
||||
|
||||
* fix for second part of issue `#100`_ where the ElectrumX was not
|
||||
* fix for some of second part of issue `#100`_ where the ElectrumX was not
|
||||
killable if bitcoind was unavailable
|
||||
|
||||
|
||||
|
||||
@ -151,7 +151,6 @@ class BlockProcessor(server.db.DB):
|
||||
|
||||
self.caught_up_event = asyncio.Event()
|
||||
self.task_queue = asyncio.Queue()
|
||||
self.stop = False
|
||||
|
||||
# Meta
|
||||
self.cache_MB = env.cache_MB
|
||||
@ -189,26 +188,19 @@ class BlockProcessor(server.db.DB):
|
||||
'''Called by the prefetcher when it first catches up.'''
|
||||
self.add_task(self.first_caught_up)
|
||||
|
||||
def on_shutdown(self):
|
||||
'''Called by the controller to shut processing down.'''
|
||||
async def do_nothing():
|
||||
pass
|
||||
self.logger.info('preparing clean shutdown')
|
||||
self.stop = True
|
||||
# Ensure something is on the queue so main_loop notices self.stop
|
||||
self.add_task(do_nothing)
|
||||
|
||||
async def main_loop(self):
|
||||
'''Main loop for block processing.'''
|
||||
await self.prefetcher.reset_height()
|
||||
|
||||
while not self.stop:
|
||||
while True:
|
||||
task = await self.task_queue.get()
|
||||
await task()
|
||||
|
||||
self.logger.info('flushing state to DB for a clean shutdown...')
|
||||
await self.executor(self.flush, True)
|
||||
self.logger.info('shutdown complete')
|
||||
def shutdown(self):
|
||||
if self.height != self.db_height:
|
||||
self.logger.info('flushing state to DB for a clean shutdown...')
|
||||
self.flush(True)
|
||||
self.logger.info('shutdown complete')
|
||||
|
||||
async def executor(self, func, *args, **kwargs):
|
||||
'''Run func taking args in the executor.'''
|
||||
|
||||
@ -13,6 +13,7 @@ import ssl
|
||||
import time
|
||||
from bisect import bisect_left
|
||||
from collections import defaultdict
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from functools import partial
|
||||
|
||||
import pylru
|
||||
@ -53,6 +54,8 @@ class Controller(util.LoggedClass):
|
||||
# Set this event to cleanly shutdown
|
||||
self.shutdown_event = asyncio.Event()
|
||||
self.loop = asyncio.get_event_loop()
|
||||
self.executor = ThreadPoolExecutor()
|
||||
self.loop.set_default_executor(self.executor)
|
||||
self.start = time.time()
|
||||
self.coin = env.coin
|
||||
self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url))
|
||||
@ -215,8 +218,8 @@ class Controller(util.LoggedClass):
|
||||
for n in range(4):
|
||||
add_future(self.serve_requests())
|
||||
|
||||
bp_future = asyncio.ensure_future(self.bp.main_loop())
|
||||
futures = []
|
||||
add_future(self.bp.main_loop())
|
||||
add_future(self.bp.prefetcher.main_loop())
|
||||
add_future(await_bp_catchup())
|
||||
|
||||
@ -225,35 +228,36 @@ class Controller(util.LoggedClass):
|
||||
self.logger.info('shutting down gracefully')
|
||||
self.state = self.SHUTTING_DOWN
|
||||
|
||||
# First tell the block processor to shut down, it may need to
|
||||
# perform a lengthy flush. Then shut down the rest.
|
||||
self.bp.on_shutdown()
|
||||
# Close servers and sessions
|
||||
self.close_servers(list(self.servers.keys()))
|
||||
for session in self.sessions:
|
||||
self.close_session(session)
|
||||
|
||||
# Cancel the futures
|
||||
for future in futures:
|
||||
future.cancel()
|
||||
|
||||
# Now wait for the cleanup to complete
|
||||
await self.close_sessions()
|
||||
if not bp_future.done():
|
||||
self.logger.info('waiting for block processor')
|
||||
await bp_future
|
||||
await asyncio.wait(futures)
|
||||
|
||||
# Wait for the executor to finish anything it's doing
|
||||
self.executor.shutdown()
|
||||
self.bp.shutdown()
|
||||
|
||||
def close_servers(self, kinds):
|
||||
'''Close the servers of the given kinds (TCP etc.).'''
|
||||
self.logger.info('closing down {} listening servers'
|
||||
.format(', '.join(kinds)))
|
||||
if kinds:
|
||||
self.logger.info('closing down {} listening servers'
|
||||
.format(', '.join(kinds)))
|
||||
for kind in kinds:
|
||||
server = self.servers.pop(kind, None)
|
||||
if server:
|
||||
server.close()
|
||||
|
||||
async def close_sessions(self, secs=30):
|
||||
async def wait_for_sessions(self, secs=30):
|
||||
if not self.sessions:
|
||||
return
|
||||
self.logger.info('waiting up to {:d} seconds for socket cleanup'
|
||||
.format(secs))
|
||||
for session in self.sessions:
|
||||
self.close_session(session)
|
||||
limit = time.time() + secs
|
||||
while self.sessions and time.time() < limit:
|
||||
self.clear_stale_sessions(grace=secs//2)
|
||||
|
||||
@ -70,10 +70,13 @@ class Daemon(util.LoggedClass):
|
||||
async with self.workqueue_semaphore:
|
||||
url = self.urls[self.url_index]
|
||||
async with aiohttp.post(url, data=data) as resp:
|
||||
result = processor(await resp.json())
|
||||
if self.prior_msg:
|
||||
self.logger.info('connection restored')
|
||||
return result
|
||||
if resp.status == 200:
|
||||
if self.prior_msg:
|
||||
self.logger.info('connection restored')
|
||||
result = processor(await resp.json())
|
||||
return result
|
||||
log_error('HTTP error code {:d}: {}'
|
||||
.format(resp.status, resp.reason))
|
||||
except asyncio.TimeoutError:
|
||||
log_error('timeout error.', skip_once=True)
|
||||
except aiohttp.ClientHttpProcessingError:
|
||||
|
||||
@ -123,7 +123,8 @@ class DB(util.LoggedClass):
|
||||
.format(util.formatted_time(self.wall_time)))
|
||||
|
||||
def read_utxo_state(self):
|
||||
if self.utxo_db.is_new:
|
||||
state = self.utxo_db.get(b'state')
|
||||
if not state:
|
||||
self.db_height = -1
|
||||
self.db_tx_count = 0
|
||||
self.db_tip = b'\0' * 32
|
||||
@ -132,9 +133,7 @@ class DB(util.LoggedClass):
|
||||
self.wall_time = 0
|
||||
self.first_sync = True
|
||||
else:
|
||||
state = self.utxo_db.get(b'state')
|
||||
if state:
|
||||
state = ast.literal_eval(state.decode())
|
||||
state = ast.literal_eval(state.decode())
|
||||
if not isinstance(state, dict):
|
||||
raise self.DBError('failed reading state from DB')
|
||||
self.db_version = state['db_version']
|
||||
|
||||
@ -1 +1 @@
|
||||
VERSION = "ElectrumX 0.10.5"
|
||||
VERSION = "ElectrumX 0.10.6"
|
||||
|
||||
Loading…
Reference in New Issue
Block a user