From 35506f60546cf81434f84b943a11e15745662d93 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 22 Jul 2018 10:10:56 +0800 Subject: [PATCH] Make notifications properly async --- electrumx/server/chain_state.py | 13 ++++++---- electrumx/server/controller.py | 10 +++++--- electrumx/server/session.py | 44 +++++++++++++-------------------- 3 files changed, 31 insertions(+), 36 deletions(-) diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index 5609ed8..bdc4233 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -39,6 +39,14 @@ class ChainState(object): self.mempool_value = self._mempool.value self.tx_branch_and_root = self._bp.merkle.branch_and_root self.read_headers = self._bp.read_headers + # Cache maintenance + notifications.add_callback(self._notify) + + async def _notify(self, height, touched): + # Invalidate our history cache for touched hashXs + hc = self._history_cache + for hashX in set(hc).intersection(touched): + del hc[hashX] async def broadcast_transaction(self, raw_tx): return await self._daemon.sendrawtransaction([raw_tx]) @@ -82,11 +90,6 @@ class ChainState(object): def header_branch_and_root(self, length, height): return self._bp.header_mc.branch_and_root(length, height) - def invalidate_history_cache(self, touched): - hc = self._history_cache - for hashX in set(hc).intersection(touched): - del hc[hashX] - def processing_new_block(self): '''Return True if we're processing a new block.''' return self._daemon.cached_height() > self.db_height() diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index d126e5d..7c766ef 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -33,6 +33,7 @@ class Notifications(object): self._touched_mp = {} self._touched_bp = {} self._highest_block = 0 + self._notify_funcs = set() async def _maybe_notify(self): tmp, tbp = self._touched_mp, self._touched_bp @@ -52,7 +53,11 @@ class Notifications(object): del tmp[old] for old in [h for h in tbp if h <= height]: del tbp[old] - await self.notify_sessions(touched, height) + for notify_func in self._notify_funcs: + await notify_func(height, touched) + + def add_callback(self, notify_func): + self._notify_funcs.add(notify_func) async def on_mempool(self, touched, height): self._touched_mp[height] = touched @@ -63,9 +68,6 @@ class Notifications(object): self._highest_block = height await self._maybe_notify() - async def notify_sessions(self, touched, height): - pass - class Controller(ServerBase): '''Manages server initialisation and stutdown. diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 81196fe..73c5355 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -104,7 +104,6 @@ class SessionManager(object): self.tasks = tasks self.chain_state = chain_state self.peer_mgr = peer_mgr - self.notifications = notifications self.shutdown_event = shutdown_event self.logger = util.class_logger(__name__, self.__class__.__name__) self.servers = {} @@ -125,7 +124,9 @@ class SessionManager(object): self.mn_cache = [] # Event triggered when electrumx is listening for incoming requests. self.server_listening = asyncio.Event() - notifications.notify_sessions = self.notify_sessions + # Tell sessions about subscription changes + notifications.add_callback(self._notify_sessions) + # Set up the RPC request handlers cmds = ('add_peer daemon_url disconnect getinfo groups log peers ' 'reorg sessions stop'.split()) @@ -432,17 +433,11 @@ class SessionManager(object): '''The number of connections that we've sent something to.''' return len(self.sessions) - async def notify_sessions(self, touched, height): + async def _notify_sessions(self, height, touched): '''Notify sessions about height changes and touched addresses.''' - self.chain_state.invalidate_history_cache(touched) - # Height notifications are synchronous. Those sessions with - # touched addresses are scheduled for asynchronous completion + create_task = self.tasks.create_task for session in self.sessions: - if isinstance(session, LocalRPC): - continue - session_touched = session.notify(height, touched) - if session_touched is not None: - self.tasks.create_task(session.notify_async(session_touched)) + create_task(session.notify(height, touched)) def add_session(self, session): self.sessions.add(session) @@ -497,6 +492,9 @@ class SessionBase(ServerSession): self.bw_limit = self.env.bandwidth_limit self._orig_mr = self.rpc.message_received + async def notify(self, height, touched): + pass + def peer_address_str(self, *, for_log=True): '''Returns the peer's IP address and port as a human-readable string, respecting anon logs if the output is for a log.''' @@ -618,7 +616,7 @@ class ElectrumX(SessionBase): def sub_count(self): return len(self.hashX_subs) - async def notify_async(self, our_touched): + async def notify_touched(self, our_touched): changed = {} for hashX in our_touched: @@ -648,7 +646,7 @@ class ElectrumX(SessionBase): self.logger.info('notified of {:,d} address{}' .format(len(changed), es)) - def notify(self, height, touched): + async def notify(self, height, touched): '''Notify the client about changes to touched addresses (from mempool updates or new blocks) and height. @@ -665,11 +663,9 @@ class ElectrumX(SessionBase): args = (self.subscribe_headers_result(height), ) self.send_notification('blockchain.headers.subscribe', args) - our_touched = touched.intersection(self.hashX_subs) - if our_touched or (height_changed and self.mempool_statuses): - return our_touched - - return None + touched = touched.intersection(self.hashX_subs) + if touched or (height_changed and self.mempool_statuses): + await self.notify_touched(touched) def assert_boolean(self, value): '''Return param value it is boolean otherwise raise an RPCError.''' @@ -1221,21 +1217,15 @@ class DashElectrumX(ElectrumX): 'masternode.list': self.masternode_list }) - async def notify_masternodes_async(self): + async def notify(self, height, touched): + '''Notify the client about changes in masternode list.''' + await super().notify(height, touched) for mn in self.mns: status = await self.daemon_request('masternode_list', ['status', mn]) self.send_notification('masternode.subscribe', [mn, status.get(mn)]) - def notify(self, height, touched): - '''Notify the client about changes in masternode list.''' - result = super().notify(height, touched) - # FIXME: the notifications should be done synchronously and the - # master node list fetched once asynchronously - self.session_mgr.tasks.create_task(self.notify_masternodes_async()) - return result - # Masternode command handlers async def masternode_announce_broadcast(self, signmnb): '''Pass through the masternode announce message to be broadcast