From 7126864052c57cc92dffa33b252eec216672c3ff Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Fri, 12 Jan 2018 15:19:36 +0800 Subject: [PATCH] Wait for mempool to sync before starting external servers Closes #335 --- server/controller.py | 10 +++++----- server/mempool.py | 18 +++++------------- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/server/controller.py b/server/controller.py index fecb4b9..e7e45b9 100644 --- a/server/controller.py +++ b/server/controller.py @@ -125,9 +125,8 @@ class Controller(ServerBase): return self.mempool.value(hashX) def sent_tx(self, tx_hash): - '''Call when a TX is sent. Tells mempool to prioritize it.''' + '''Call when a TX is sent.''' self.txs_sent += 1 - self.mempool.prioritize(tx_hash) def setup_bands(self): bands = [] @@ -206,14 +205,15 @@ class Controller(ServerBase): self.next_log_sessions = time.time() + self.env.log_sessions async def wait_for_bp_catchup(self): - '''Wait for the block processor to catch up, then kick off server - background processes.''' + '''Wait for the block processor to catch up, and for the mempool to + synchronize, then kick off server background processes.''' await self.bp.caught_up_event.wait() self.logger.info('block processor has caught up') + self.ensure_future(self.mempool.main_loop()) + await self.mempool.synchronized_event.wait() self.ensure_future(self.peer_mgr.main_loop()) self.ensure_future(self.log_start_external_servers()) self.ensure_future(self.housekeeping()) - self.ensure_future(self.mempool.main_loop()) def close_servers(self, kinds): '''Close the servers of the given kinds (TCP etc.).''' diff --git a/server/mempool.py b/server/mempool.py index 219a38c..4193c83 100644 --- a/server/mempool.py +++ b/server/mempool.py @@ -38,15 +38,10 @@ class MemPool(util.LoggedClass): self.coin = bp.coin self.db = bp self.touched = set() - self.prioritized = set() self.stop = False self.txs = {} self.hashXs = defaultdict(set) # None can be a key - - def prioritize(self, tx_hash): - '''Prioritize processing the given hash. This is important during - initial mempool sync.''' - self.prioritized.add(tx_hash) + self.synchronized_event = asyncio.Event() def _resync_daemon_hashes(self, unprocessed, unfetched): '''Re-sync self.txs with the list of hashes in the daemon's mempool. @@ -91,9 +86,9 @@ class MemPool(util.LoggedClass): fetch_size = 800 process_some = self._async_process_some(fetch_size // 2) - await self.daemon.mempool_refresh_event.wait() self.logger.info('beginning processing of daemon mempool. ' 'This can take some time...') + await self.daemon.mempool_refresh_event.wait() next_log = 0 loops = -1 # Zero during initial catchup @@ -111,6 +106,8 @@ class MemPool(util.LoggedClass): '({:,d} txs left)'.format(pct, todo)) if not todo: loops += 1 + if loops > 0: + self.synchronized_event.set() now = time.time() if now >= next_log and loops: self.logger.info('{:,d} txs touching {:,d} addresses' @@ -119,7 +116,6 @@ class MemPool(util.LoggedClass): try: if not todo: - self.prioritized.clear() await self.daemon.mempool_refresh_event.wait() self._resync_daemon_hashes(unprocessed, unfetched) @@ -148,10 +144,6 @@ class MemPool(util.LoggedClass): raw_txs = {} - for hex_hash in self.prioritized: - if hex_hash in unprocessed: - raw_txs[hex_hash] = unprocessed.pop(hex_hash) - while unprocessed and len(raw_txs) < limit: hex_hash, raw_tx = unprocessed.popitem() raw_txs[hex_hash] = raw_tx @@ -213,7 +205,7 @@ class MemPool(util.LoggedClass): db_utxo_lookup = self.db.db_utxo_lookup txs = self.txs - # Deserialize each tx and put it in our priority queue + # Deserialize each tx and put it in a pending list for tx_hash, raw_tx in raw_tx_map.items(): if tx_hash not in txs: continue