From 5d5383a789b5e04f0c16f636c3b87eaedaea5445 Mon Sep 17 00:00:00 2001 From: 4tochka Date: Fri, 3 May 2019 00:01:00 +0400 Subject: [PATCH] connector --- pybtc/connector.py | 98 +++++++++++++++++----------------------------- 1 file changed, 36 insertions(+), 62 deletions(-) diff --git a/pybtc/connector.py b/pybtc/connector.py index 11ae58e..7aa6ea5 100644 --- a/pybtc/connector.py +++ b/pybtc/connector.py @@ -24,9 +24,8 @@ class Connector: block_timeout=30, deep_sync_limit=20, backlog=0, mempool_tx=True, rpc_batch_limit=50, rpc_threads_limit=100, rpc_timeout=100, - preload=False, utxo_data=False, - utxo_cache_size=2000000, + utxo_cache_size=1000000, skip_opreturn=True, postgres_pool=None): self.loop = asyncio.get_event_loop() @@ -69,6 +68,7 @@ class Connector: self.active_block.set_result(True) self.last_zmq_msg = int(time.time()) self.total_received_tx = 0 + self.total_received_tx_stat = 0 self.blocks_processed_count = 0 self.blocks_decode_time = 0 self.blocks_download_time = 0 @@ -77,12 +77,15 @@ class Connector: self.start_time = time.time() # cache and system - self.preload = preload - self.block_preload = Cache(max_size=500 * 1000000) - self.block_hashes = Cache(max_size=200 * 100000) + self.block_preload_cache_limit = 500 * 1000000 + self.block_hashes_cache_limit = 200 * 100000 + self.tx_cache_limit = 100 * 100000 + self.block_headers_cache_limit = 100 * 100000 + self.block_preload = Cache(max_size=self.block_preload_cache_limit) + self.block_hashes = Cache(max_size=self.block_hashes_cache_limit) self.block_hashes_preload_mutex = False - self.tx_cache = Cache(max_size=100 * 1000000) - self.block_cache = Cache(max_size=20 * 1000000) + self.tx_cache = Cache(max_size=self.tx_cache_limit) + self.block_headers_cache = Cache(max_size=self.block_headers_cache_limit) self.block_txs_request = None @@ -142,7 +145,7 @@ class Connector: if h < len(self.chain_tail): raise Exception("Chain tail len not match last block height") for row in reversed(self.chain_tail): - self.block_cache.set(row, h) + self.block_headers_cache.set(row, h) h -= 1 self.tasks.append(self.loop.create_task(self.zeromq_handler())) @@ -308,7 +311,7 @@ class Connector: if h is None: h = await self.rpc.getblockhash(self.last_block_height + 1) if not self.block_hashes_preload_mutex: - self.loop.create_task(self.preload_block_hashes()) + self.loop.create_task(self.preload_blocks()) block = await self._get_block_by_hash(h) else: h = await self.rpc.getblockhash(self.last_block_height + 1) @@ -342,7 +345,7 @@ class Connector: self.log.error(str(traceback.format_exc())) async def _new_block(self, block): - if self.block_cache.get(block["hash"]) is not None: + if self.block_headers_cache.get(block["hash"]) is not None: return if self.deep_synchronization: block["height"] = self.last_block_height + 1 @@ -369,7 +372,7 @@ class Connector: if self.block_handler and not self.cache_loading: await self.block_handler(block) - self.block_cache.set(block["hash"], block["height"]) + self.block_headers_cache.set(block["hash"], block["height"]) self.last_block_height = block["height"] if self.utxo_data: self.loop.create_task(self.utxo.save_utxo(block["height"])) @@ -379,18 +382,21 @@ class Connector: [self.tx_cache.pop(h) for h in tx_bin_list] tx_rate = round(self.total_received_tx / (time.time() - self.start_time), 4) - if block["height"] % 1000 == 0: - self.log.info("Blocks %s; tx rate: %s;" % (block["height"], tx_rate)) + + if (self.total_received_tx - self.total_received_tx_stat) > 10000: + self.total_received_tx_stat = self.total_received_tx + self.log.warning("Blocks %s; tx rate: %s;" % (block["height"], tx_rate)) if self.utxo_data: loading = "Loading ... " if self.cache_loading else "" self.log.info(loading + "UTXO %s; hit rate: %s;" % (self.utxo.len(), self.utxo.hit_rate())) - self.log.info("Blocks download time %s;" % self.blocks_download_time) - self.log.info("Blocks decode time %s;" % self.blocks_decode_time) - self.log.info("Blocks non cached %s;" % self.non_cached_blocks) - self.log.warning("Blocks cache %s;" % self.block_preload.len()) - self.log.warning("Blocks cache size %s;" % self.block_preload._store_size ) - self.log.warning("Blocks cache last %s;" % self.block_preload.get_last_key()) + self.log.info("Blocks downloaded %s; decoded %s" % (self.blocks_download_time, + self.blocks_decode_time)) + if self.deep_synchronization: + self.log.info("Synchronization:") + self.log.info("Blocks not cached %s;" % self.non_cached_blocks) + self.log.warning("Blocks cache count %s;" % self.block_preload.len()) + self.log.warning("Blocks cache size %s M;" % round(self.block_preload._store_size/1024/1024,2)) # after block added handler if self.after_block_handler and not self.cache_loading: @@ -454,23 +460,21 @@ class Connector: self.log.debug("Transactions received: %s [%s] received tx rate tx/s ->> %s <<" % (tx_count, time.time() - q, rate)) async def verify_block_position(self, block): - if self.block_cache.get(block["hash"]) is not None: - self.log.error("duplicated block %s" % block["hash"]) - raise Exception("duplicated block") if "previousblockhash" not in block : return - lb = self.block_cache.get_last_key() - if lb is None and not self.last_block_height: + if self.block_headers_cache.len() == 0: return - if self.block_cache.get_last_key() != block["previousblockhash"]: - if self.block_cache.get(block["previousblockhash"]) is None and self.last_block_height: + + lb = self.block_headers_cache.get_last_key() + if self.block_headers_cache.get_last_key() != block["previousblockhash"]: + if self.block_headers_cache.get(block["previousblockhash"]) is None and self.last_block_height: self.log.critical("Connector error! Node out of sync " "no parent block in chain tail %s" % block["previousblockhash"]) raise Exception("Node out of sync") if self.orphan_handler: await self.orphan_handler(self.last_block_height) - self.block_cache.pop_last() + self.block_headers_cache.pop_last() self.last_block_height -= 1 raise Exception("Sidebranch block removed") @@ -618,7 +622,7 @@ class Connector: return stxo - async def preload_block_hashes(self): + async def preload_blocks(self): if self.block_hashes_preload_mutex: return try: @@ -628,12 +632,10 @@ class Connector: processed_height = self.last_block_height while height < max_height: - if self.block_preload._store_size < 400 * 1000000: + if self.block_preload._store_size < self.block_preload_cache_limit * 0.9: try: if height < self.last_block_height: height = self.last_block_height + 1 - # self.log.critical(str((height, processed_height, self.last_block_height, - # self.block_hashes.len() ))) batch = list() h_list = list() while True: @@ -653,9 +655,7 @@ class Connector: h.append(lh) except: pass - # self.log.critical(str(( len(batch), ))) - # if not batch: - # self.log.critical(str((h_list, result))) + blocks = await self.rpc.batch(batch) for x,y in zip(h,blocks): @@ -673,41 +673,15 @@ class Connector: for i in range(processed_height, self.last_block_height): self.block_preload.remove(i) processed_height = self.last_block_height - if self.block_preload._store_size < 400 * 1000000: + if self.block_preload._store_size < self.block_preload_cache_limit * 0.9: continue - self.log.critical(str((processed_height, self.last_block_height))) - await asyncio.sleep(10) + await asyncio.sleep(1) # remove unused items - finally: self.block_hashes_preload_mutex = False - async def preload_block(self): - while True: - try: - start_height = self.last_block_height - height = start_height + 10 - d = await self.rpc.getblockcount() - if d > height: - while True: - height += 1 - d = await self.rpc.getblockhash(height) - ex = self.block_preload.get(d) - if not ex: - b = await self.rpc.getblock(d, 0) - block = decode_block_tx(b) - self.block_preload.set(d, block) - if start_height + 15000 < height: - break - except asyncio.CancelledError: - self.log.info("connector preload_block terminated") - break - except: - pass - await asyncio.sleep(15) - async def stop(self): self.active = False