From cb33dd115fc55b518cbc18ef7b39cd246755b502 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 27 Nov 2017 18:18:09 +0900 Subject: [PATCH] Clean up client notifications - mempool informed of new block; it notifies controller synchronously - controller notifies sessions synchronously - sessions are notified of new height synchronously. Any address touch notifications are returned to the controller and scheduled asynchronously. Also, remove a redundant notification of height on initial header subscriptions - the subscription response gives the current height; we also used to send a notification as we didn't update our idea of notified height. --- server/block_processor.py | 3 +- server/controller.py | 34 +++++++-------- server/mempool.py | 17 ++++++-- server/session.py | 88 +++++++++++++++++++++------------------ 4 files changed, 78 insertions(+), 64 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index bbeba85..2b2840d 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -253,6 +253,8 @@ class BlockProcessor(server.db.DB): self.logger.info('processed {:,d} block{} in {:.1f}s' .format(len(blocks), s, time.time() - start)) + self.controller.mempool.on_new_block(self.touched) + self.touched.clear() elif hprevs[0] != chain[0]: await self.reorg_chain() else: @@ -511,7 +513,6 @@ class BlockProcessor(server.db.DB): if self.caught_up_event.is_set(): self.flush(True) else: - self.touched.clear() if time.time() > self.next_cache_check: self.check_cache_size() self.next_cache_check = time.time() + 30 diff --git a/server/controller.py b/server/controller.py index 09fc78d..d77b35d 100644 --- a/server/controller.py +++ b/server/controller.py @@ -214,7 +214,6 @@ class Controller(ServerBase): self.ensure_future(self.log_start_external_servers()) self.ensure_future(self.housekeeping()) self.ensure_future(self.mempool.main_loop()) - self.ensure_future(self.notify()) def close_servers(self, kinds): '''Close the servers of the given kinds (TCP etc.).''' @@ -272,27 +271,24 @@ class Controller(ServerBase): sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) await self.start_server('SSL', host, env.ssl_port, ssl=sslc) - async def notify(self): + def notify_sessions(self, touched): '''Notify sessions about height changes and touched addresses.''' - while True: - await self.mempool.touched_event.wait() - touched = self.mempool.touched.copy() - self.mempool.touched.clear() - self.mempool.touched_event.clear() + # Invalidate caches + hc = self.history_cache + for hashX in set(hc).intersection(touched): + del hc[hashX] - # Invalidate caches - hc = self.history_cache - for hashX in set(hc).intersection(touched): - del hc[hashX] - if self.bp.db_height != self.cache_height: - self.cache_height = self.bp.db_height - self.header_cache.clear() + height = self.bp.db_height + if height != self.cache_height: + self.cache_height = height + self.header_cache.clear() - # Make a copy; self.sessions can change whilst await-ing - sessions = [s for s in self.sessions - if isinstance(s, self.coin.SESSIONCLS)] - for session in sessions: - await session.notify(self.bp.db_height, touched) + # Height notifications are synchronous. Those sessions with + # touched addresses are scheduled for asynchronous completion + for session in self.sessions: + session_touched = session.notify(height, touched) + if session_touched is not None: + self.ensure_future(session.notify_async(session_touched)) def notify_peers(self, updates): '''Notify of peer updates.''' diff --git a/server/mempool.py b/server/mempool.py index 591a2aa..c5e35d3 100644 --- a/server/mempool.py +++ b/server/mempool.py @@ -37,8 +37,7 @@ class MemPool(util.LoggedClass): self.controller = controller self.coin = bp.coin self.db = bp - self.touched = bp.touched - self.touched_event = asyncio.Event() + self.touched = set() self.prioritized = set() self.stop = False self.txs = {} @@ -101,7 +100,8 @@ class MemPool(util.LoggedClass): while True: # Avoid double notifications if processing a block if self.touched and not self.processing_new_block(): - self.touched_event.set() + self.controller.notify_sessions(self.touched) + self.touched.clear() # Log progress / state todo = len(unfetched) + len(unprocessed) @@ -177,6 +177,17 @@ class MemPool(util.LoggedClass): return process + def on_new_block(self, touched): + '''Called after processing one or more new blocks. + + Touched is a set of hashXs touched by the transactions in the + block. Caller must be aware it is modified by this function. + ''' + # Minor race condition here with mempool processor thread + touched.update(self.touched) + self.touched.clear() + self.controller.notify_sessions(touched) + def processing_new_block(self): '''Return True if we're processing a new block.''' return self.daemon.cached_height() > self.db.db_height diff --git a/server/session.py b/server/session.py index e1ccaad..aa9cdfd 100644 --- a/server/session.py +++ b/server/session.py @@ -119,64 +119,70 @@ class ElectrumX(SessionBase): def sub_count(self): return len(self.hashX_subs) - async def notify(self, height, touched): - '''Notify the client about changes in height and touched addresses. + async def notify_async(self, our_touched): + changed = {} - Cache is a shared cache for this update. - ''' - pairs = [] - changed = [] - - matches = touched.intersection(self.hashX_subs) - for hashX in matches: + for hashX in our_touched: alias = self.hashX_subs[hashX] status = await self.address_status(hashX) - changed.append((alias, status)) + changed[alias] = status - if height != self.notified_height: - self.notified_height = height - if self.subscribe_headers: - args = (self.controller.electrum_header(height), ) - pairs.append(('blockchain.headers.subscribe', args)) + # Check mempool hashXs - the status is a function of the + # confirmed state of other transactions + for hashX, old_status in self.mempool_statuses.items(): + status = await self.address_status(hashX) + if status != old_status: + alias = self.hashX_subs[hashX] + changed[alias] = status - if self.subscribe_height: - pairs.append(('blockchain.numblocks.subscribe', (height, ))) - - # Check mempool hashXs - the status is a function of the - # confirmed state of other transactions - for hashX in set(self.mempool_statuses).difference(matches): - old_status = self.mempool_statuses[hashX] - status = await self.address_status(hashX) - if status != old_status: - alias = self.hashX_subs[hashX] - changed.append((alias, status)) - - for alias_status in changed: - if len(alias_status[0]) == 64: + for alias, status in changed.items(): + if len(alias) == 64: method = 'blockchain.scripthash.subscribe' else: method = 'blockchain.address.subscribe' - pairs.append((method, alias_status)) + self.send_notification(method, (alias, status)) - if pairs: - self.send_notifications(pairs) - if changed: - es = '' if len(changed) == 1 else 'es' - self.log_info('notified of {:,d} address{}' - .format(len(changed), es)) + if changed: + es = '' if len(changed) == 1 else 'es' + self.log_info('notified of {:,d} address{}' + .format(len(changed), es)) + + def notify(self, height, touched): + '''Notify the client about changes to touched addresses (from mempool + updates or new blocks) and height. + + Return the set of addresses the session needs to be + asyncronously notified about. This can be empty if there are + possible mempool status updates. + + Returns None if nothing needs to be notified asynchronously. + ''' + height_changed = height != self.notified_height + if height_changed: + self.notified_height = height + if self.subscribe_headers: + args = (self.controller.electrum_header(height), ) + self.send_notification('blockchain.headers.subscribe', args) + if self.subscribe_height: + args = (height, ) + self.send_notification('blockchain.numblocks.subscribe', args) + + our_touched = touched.intersection(self.hashX_subs) + if our_touched or (height_changed and self.mempool_statuses): + return our_touched + + return None def height(self): '''Return the current flushed database height.''' return self.bp.db_height - def current_electrum_header(self): - '''Used as response to a headers subscription request.''' - return self.controller.electrum_header(self.height()) - def headers_subscribe(self): '''Subscribe to get headers of new blocks.''' self.subscribe_headers = True - return self.current_electrum_header() + height = self.height() + self.notified_height = height + return self.controller.electrum_header(height) def numblocks_subscribe(self): '''Subscribe to get height of new blocks.'''