Make notifications properly async

This commit is contained in:
Neil Booth 2018-07-22 10:10:56 +08:00
parent 277e2447c0
commit 35506f6054
3 changed files with 31 additions and 36 deletions

View File

@ -39,6 +39,14 @@ class ChainState(object):
self.mempool_value = self._mempool.value
self.tx_branch_and_root = self._bp.merkle.branch_and_root
self.read_headers = self._bp.read_headers
# Cache maintenance
notifications.add_callback(self._notify)
async def _notify(self, height, touched):
# Invalidate our history cache for touched hashXs
hc = self._history_cache
for hashX in set(hc).intersection(touched):
del hc[hashX]
async def broadcast_transaction(self, raw_tx):
return await self._daemon.sendrawtransaction([raw_tx])
@ -82,11 +90,6 @@ class ChainState(object):
def header_branch_and_root(self, length, height):
return self._bp.header_mc.branch_and_root(length, height)
def invalidate_history_cache(self, touched):
hc = self._history_cache
for hashX in set(hc).intersection(touched):
del hc[hashX]
def processing_new_block(self):
'''Return True if we're processing a new block.'''
return self._daemon.cached_height() > self.db_height()

View File

@ -33,6 +33,7 @@ class Notifications(object):
self._touched_mp = {}
self._touched_bp = {}
self._highest_block = 0
self._notify_funcs = set()
async def _maybe_notify(self):
tmp, tbp = self._touched_mp, self._touched_bp
@ -52,7 +53,11 @@ class Notifications(object):
del tmp[old]
for old in [h for h in tbp if h <= height]:
del tbp[old]
await self.notify_sessions(touched, height)
for notify_func in self._notify_funcs:
await notify_func(height, touched)
def add_callback(self, notify_func):
self._notify_funcs.add(notify_func)
async def on_mempool(self, touched, height):
self._touched_mp[height] = touched
@ -63,9 +68,6 @@ class Notifications(object):
self._highest_block = height
await self._maybe_notify()
async def notify_sessions(self, touched, height):
pass
class Controller(ServerBase):
'''Manages server initialisation and stutdown.

View File

@ -104,7 +104,6 @@ class SessionManager(object):
self.tasks = tasks
self.chain_state = chain_state
self.peer_mgr = peer_mgr
self.notifications = notifications
self.shutdown_event = shutdown_event
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.servers = {}
@ -125,7 +124,9 @@ class SessionManager(object):
self.mn_cache = []
# Event triggered when electrumx is listening for incoming requests.
self.server_listening = asyncio.Event()
notifications.notify_sessions = self.notify_sessions
# Tell sessions about subscription changes
notifications.add_callback(self._notify_sessions)
# Set up the RPC request handlers
cmds = ('add_peer daemon_url disconnect getinfo groups log peers '
'reorg sessions stop'.split())
@ -432,17 +433,11 @@ class SessionManager(object):
'''The number of connections that we've sent something to.'''
return len(self.sessions)
async def notify_sessions(self, touched, height):
async def _notify_sessions(self, height, touched):
'''Notify sessions about height changes and touched addresses.'''
self.chain_state.invalidate_history_cache(touched)
# Height notifications are synchronous. Those sessions with
# touched addresses are scheduled for asynchronous completion
create_task = self.tasks.create_task
for session in self.sessions:
if isinstance(session, LocalRPC):
continue
session_touched = session.notify(height, touched)
if session_touched is not None:
self.tasks.create_task(session.notify_async(session_touched))
create_task(session.notify(height, touched))
def add_session(self, session):
self.sessions.add(session)
@ -497,6 +492,9 @@ class SessionBase(ServerSession):
self.bw_limit = self.env.bandwidth_limit
self._orig_mr = self.rpc.message_received
async def notify(self, height, touched):
pass
def peer_address_str(self, *, for_log=True):
'''Returns the peer's IP address and port as a human-readable
string, respecting anon logs if the output is for a log.'''
@ -618,7 +616,7 @@ class ElectrumX(SessionBase):
def sub_count(self):
return len(self.hashX_subs)
async def notify_async(self, our_touched):
async def notify_touched(self, our_touched):
changed = {}
for hashX in our_touched:
@ -648,7 +646,7 @@ class ElectrumX(SessionBase):
self.logger.info('notified of {:,d} address{}'
.format(len(changed), es))
def notify(self, height, touched):
async def notify(self, height, touched):
'''Notify the client about changes to touched addresses (from mempool
updates or new blocks) and height.
@ -665,11 +663,9 @@ class ElectrumX(SessionBase):
args = (self.subscribe_headers_result(height), )
self.send_notification('blockchain.headers.subscribe', args)
our_touched = touched.intersection(self.hashX_subs)
if our_touched or (height_changed and self.mempool_statuses):
return our_touched
return None
touched = touched.intersection(self.hashX_subs)
if touched or (height_changed and self.mempool_statuses):
await self.notify_touched(touched)
def assert_boolean(self, value):
'''Return param value it is boolean otherwise raise an RPCError.'''
@ -1221,21 +1217,15 @@ class DashElectrumX(ElectrumX):
'masternode.list': self.masternode_list
})
async def notify_masternodes_async(self):
async def notify(self, height, touched):
'''Notify the client about changes in masternode list.'''
await super().notify(height, touched)
for mn in self.mns:
status = await self.daemon_request('masternode_list',
['status', mn])
self.send_notification('masternode.subscribe',
[mn, status.get(mn)])
def notify(self, height, touched):
'''Notify the client about changes in masternode list.'''
result = super().notify(height, touched)
# FIXME: the notifications should be done synchronously and the
# master node list fetched once asynchronously
self.session_mgr.tasks.create_task(self.notify_masternodes_async())
return result
# Masternode command handlers
async def masternode_announce_broadcast(self, signmnb):
'''Pass through the masternode announce message to be broadcast