Merge branch 'stop_log_spew' into develop
This commit is contained in:
commit
ac13ad5bda
@ -208,8 +208,7 @@ class BlockProcessor(server.db.DB):
|
|||||||
await self._wait_for_update()
|
await self._wait_for_update()
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
self.on_cancel()
|
self.on_cancel()
|
||||||
# This lets the asyncio subsystem process futures cancellations
|
await self.wait_shutdown()
|
||||||
await asyncio.sleep(0)
|
|
||||||
|
|
||||||
def on_cancel(self):
|
def on_cancel(self):
|
||||||
'''Called when the main loop is cancelled.
|
'''Called when the main loop is cancelled.
|
||||||
@ -219,6 +218,10 @@ class BlockProcessor(server.db.DB):
|
|||||||
future.cancel()
|
future.cancel()
|
||||||
self.flush(True)
|
self.flush(True)
|
||||||
|
|
||||||
|
async def wait_shutdown(self):
|
||||||
|
'''Wait for shutdown to complete cleanly, and return.'''
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
async def _wait_for_update(self):
|
async def _wait_for_update(self):
|
||||||
'''Wait for the prefetcher to deliver blocks or a mempool update.
|
'''Wait for the prefetcher to deliver blocks or a mempool update.
|
||||||
|
|
||||||
|
|||||||
@ -54,6 +54,11 @@ class BlockServer(BlockProcessor):
|
|||||||
self.server_mgr.stop()
|
self.server_mgr.stop()
|
||||||
super().on_cancel()
|
super().on_cancel()
|
||||||
|
|
||||||
|
async def wait_shutdown(self):
|
||||||
|
'''Wait for shutdown to complete cleanly, and return.'''
|
||||||
|
await self.server_mgr.wait_shutdown()
|
||||||
|
await super().wait_shutdown()
|
||||||
|
|
||||||
def mempool_transactions(self, hash168):
|
def mempool_transactions(self, hash168):
|
||||||
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
|
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
|
||||||
entries for the hash168.
|
entries for the hash168.
|
||||||
@ -141,7 +146,7 @@ class MemPool(LoggedClass):
|
|||||||
|
|
||||||
for n, (hex_hash, tx) in enumerate(new_txs.items()):
|
for n, (hex_hash, tx) in enumerate(new_txs.items()):
|
||||||
# Yield to process e.g. signals
|
# Yield to process e.g. signals
|
||||||
if n % 100 == 0:
|
if n % 20 == 0:
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
txout_pairs = [txout_pair(txout) for txout in tx.outputs]
|
txout_pairs = [txout_pair(txout) for txout in tx.outputs]
|
||||||
self.txs[hex_hash] = (None, txout_pairs, None)
|
self.txs[hex_hash] = (None, txout_pairs, None)
|
||||||
@ -162,8 +167,7 @@ class MemPool(LoggedClass):
|
|||||||
# Now add the inputs
|
# Now add the inputs
|
||||||
for n, (hex_hash, tx) in enumerate(new_txs.items()):
|
for n, (hex_hash, tx) in enumerate(new_txs.items()):
|
||||||
# Yield to process e.g. signals
|
# Yield to process e.g. signals
|
||||||
if n % 10 == 0:
|
await asyncio.sleep(0)
|
||||||
await asyncio.sleep(0)
|
|
||||||
|
|
||||||
if initial and time.time() > next_log:
|
if initial and time.time() > next_log:
|
||||||
next_log = time.time() + 20
|
next_log = time.time() + 20
|
||||||
@ -248,7 +252,7 @@ class ServerManager(LoggedClass):
|
|||||||
self.sessions = {}
|
self.sessions = {}
|
||||||
self.max_subs = env.max_subs
|
self.max_subs = env.max_subs
|
||||||
self.subscription_count = 0
|
self.subscription_count = 0
|
||||||
self.futures = [] # At present just the IRC future, if any
|
self.irc_future = None
|
||||||
self.logger.info('max subscriptions across all sessions: {:,d}'
|
self.logger.info('max subscriptions across all sessions: {:,d}'
|
||||||
.format(self.max_subs))
|
.format(self.max_subs))
|
||||||
self.logger.info('max subscriptions per session: {:,d}'
|
self.logger.info('max subscriptions per session: {:,d}'
|
||||||
@ -263,8 +267,6 @@ class ServerManager(LoggedClass):
|
|||||||
host, port = args[:2]
|
host, port = args[:2]
|
||||||
try:
|
try:
|
||||||
self.servers.append(await server)
|
self.servers.append(await server)
|
||||||
except asyncio.CancelledError:
|
|
||||||
raise
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error('{} server failed to listen on {}:{:d} :{}'
|
self.logger.error('{} server failed to listen on {}:{:d} :{}'
|
||||||
.format(kind, host, port, e))
|
.format(kind, host, port, e))
|
||||||
@ -294,7 +296,7 @@ class ServerManager(LoggedClass):
|
|||||||
|
|
||||||
if env.irc:
|
if env.irc:
|
||||||
self.logger.info('starting IRC coroutine')
|
self.logger.info('starting IRC coroutine')
|
||||||
self.futures.append(asyncio.ensure_future(self.irc.start()))
|
self.irc_future = asyncio.ensure_future(self.irc.start())
|
||||||
else:
|
else:
|
||||||
self.logger.info('IRC disabled')
|
self.logger.info('IRC disabled')
|
||||||
|
|
||||||
@ -308,24 +310,42 @@ class ServerManager(LoggedClass):
|
|||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
'''Close listening servers.'''
|
'''Close listening servers.'''
|
||||||
|
self.logger.info('cleanly closing client sessions, please wait...')
|
||||||
for server in self.servers:
|
for server in self.servers:
|
||||||
server.close()
|
server.close()
|
||||||
|
if self.irc_future:
|
||||||
|
self.irc_future.cancel()
|
||||||
|
for session in self.sessions:
|
||||||
|
session.transport.close()
|
||||||
|
|
||||||
|
async def wait_shutdown(self):
|
||||||
|
# Wait for servers to close
|
||||||
|
for server in self.servers:
|
||||||
|
await server.wait_closed()
|
||||||
|
# Just in case a connection came in
|
||||||
|
await asyncio.sleep(0)
|
||||||
self.servers = []
|
self.servers = []
|
||||||
for future in self.futures:
|
self.logger.info('server listening sockets closed')
|
||||||
future.cancel()
|
limit = time.time() + 10
|
||||||
self.futures = []
|
while self.sessions and time.time() < limit:
|
||||||
sessions = list(self.sessions.keys()) # A copy
|
self.logger.info('{:,d} sessions remaining'
|
||||||
for session in sessions:
|
.format(len(self.sessions)))
|
||||||
self.remove_session(session)
|
await asyncio.sleep(2)
|
||||||
|
if self.sessions:
|
||||||
|
self.logger.info('forcibly closing {:,d} stragglers'
|
||||||
|
.format(len(self.sessions)))
|
||||||
|
for future in self.sessions.values():
|
||||||
|
future.cancel()
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
def add_session(self, session):
|
def add_session(self, session):
|
||||||
|
assert self.servers
|
||||||
assert session not in self.sessions
|
assert session not in self.sessions
|
||||||
coro = session.serve_requests()
|
coro = session.serve_requests()
|
||||||
self.sessions[session] = asyncio.ensure_future(coro)
|
self.sessions[session] = asyncio.ensure_future(coro)
|
||||||
|
|
||||||
def remove_session(self, session):
|
def remove_session(self, session):
|
||||||
if isinstance(session, ElectrumX):
|
self.subscription_count -= session.sub_count()
|
||||||
self.subscription_count -= len(session.hash168s)
|
|
||||||
future = self.sessions.pop(session)
|
future = self.sessions.pop(session)
|
||||||
future.cancel()
|
future.cancel()
|
||||||
|
|
||||||
@ -346,12 +366,6 @@ class ServerManager(LoggedClass):
|
|||||||
|
|
||||||
async def rpc_getinfo(self, params):
|
async def rpc_getinfo(self, params):
|
||||||
'''The RPC 'getinfo' call.'''
|
'''The RPC 'getinfo' call.'''
|
||||||
# FIXME: remove later
|
|
||||||
indep_count = sum(len(session.hash168s) for session in self.sessions
|
|
||||||
if isinstance(session, ElectrumX))
|
|
||||||
if indep_count != self.subscription_count:
|
|
||||||
self.logger.error('sub count {:,d} but session total {:,d}'
|
|
||||||
.format(self.subscription_count, indep_count))
|
|
||||||
return {
|
return {
|
||||||
'blocks': self.bp.height,
|
'blocks': self.bp.height,
|
||||||
'peers': len(self.irc.peers),
|
'peers': len(self.irc.peers),
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user