From 8cac008cd5f6b0f9653bc07a7c50e0fecaf9d5d2 Mon Sep 17 00:00:00 2001 From: 4tochka Date: Wed, 1 May 2019 14:29:51 +0400 Subject: [PATCH] connector --- pybtc/connector.py | 67 +++++++++++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 22 deletions(-) diff --git a/pybtc/connector.py b/pybtc/connector.py index a926ab0..4bf4721 100644 --- a/pybtc/connector.py +++ b/pybtc/connector.py @@ -19,7 +19,7 @@ class Connector: before_block_handler=None, block_handler=None, after_block_handler=None, block_timeout=30, deep_sync_limit=20, backlog=0, mempool_tx=True, - rpc_batch_limit=20, rpc_threads_limit=100, rpc_timeout=100, + rpc_batch_limit=50, rpc_threads_limit=100, rpc_timeout=100, preload=False, utxo_data=False, utxo_cache_size=2000000, @@ -74,7 +74,8 @@ class Connector: # cache and system self.preload = preload self.block_preload = Cache(max_size=50000) - self.block_hashes_preload = Cache(max_size=50000) + self.block_hashes = Cache(max_size=100000) + self.block_hashes_preload_mutex = False self.tx_cache = Cache(max_size=50000) self.block_cache = Cache(max_size=10000) @@ -139,7 +140,6 @@ class Connector: self.block_cache.set(row, h) h -= 1 - await self.preload_block_hashes() self.tasks.append(self.loop.create_task(self.zeromq_handler())) self.tasks.append(self.loop.create_task(self.watchdog())) self.connected.set_result(True) @@ -289,9 +289,16 @@ class Connector: else: if self.deep_synchronization: self.log.warning("Normal synchronization mode") + # clear preload caches self.deep_synchronization = False q = time.time() - h = await self.rpc.getblockhash(self.last_block_height + 1) + if self.deep_synchronization: + h = self.block_hashes.get(self.last_block_height + 1) + if h is None: + h = await self.rpc.getblockhash(self.last_block_height + 1) + self.loop.create_task(self.preload_block_hashes()) + else: + h = await self.rpc.getblockhash(self.last_block_height + 1) self.blocks_download_time += time.time() - q await self._get_block_by_hash(h) except Exception as err: @@ -574,25 +581,41 @@ class Connector: async def preload_block_hashes(self): - max_height = self.node_last_block - self.deep_synchronization - height = self.last_block_height + 1 - while height < max_height: - try: - batch = list() - while True: - batch.append(["getblockhash", height]) - if len(batch) >= self.batch_limit * 2 or height >= max_height: - break - height += 1 - result = await self.rpc.batch(batch) - self.log.warning(str(height)) - - except asyncio.CancelledError: - self.log.info("connector preload_block_hashes failed") - break - except: - pass + if self.block_hashes_preload_mutex: + return + try: + self.block_hashes_preload_mutex = True + self.block_hashes = Cache(max_size=100000) + max_height = self.node_last_block - self.deep_synchronization + height = self.last_block_height + 1 + lh = height + l = height + 100000 + if l > max_height: + l = max_height + while height < l: + try: + batch = list() + while True: + batch.append(["getblockhash", height]) + if len(batch) >= self.batch_limit or height >= l: + break + height += 1 + result = await self.rpc.batch(batch) + for r in result: + try: + self.block_hashes.set(lh, r["result"]) + except: + pass + lh += 1 + self.log.warning(str(height)) + except asyncio.CancelledError: + self.log.info("connector preload_block_hashes failed") + break + except: + pass + finally: + self.block_hashes_preload_mutex = False async def preload_block(self): while True: