From d7992272317a29710ffa9436cb7cf91d2c6a8d5e Mon Sep 17 00:00:00 2001 From: 4tochka Date: Mon, 3 Jun 2019 14:21:15 +0400 Subject: [PATCH] connector --- pybtc/connector/block_loader.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/pybtc/connector/block_loader.py b/pybtc/connector/block_loader.py index 19874d2..c0a4ee5 100644 --- a/pybtc/connector/block_loader.py +++ b/pybtc/connector/block_loader.py @@ -65,7 +65,7 @@ class BlockLoader: self.worker_tasks = [self.loop.create_task(self.start_worker(i)) for i in range(self.worker_limit)] target_height = self.parent.node_last_block - self.parent.deep_sync_limit height = self.parent.last_block_height + 1 - self.log.info(str(height)) + while height < target_height: new_requests = 0 if self.parent.block_preload._store_size < self.parent.block_preload_cache_limit: @@ -86,7 +86,7 @@ class BlockLoader: if height <= self.parent.last_block_height: height = self.parent.last_block_height + 1 await self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit', - int_to_bytes(self.rpc_batch_limit)) + int_to_bytes(self.rpc_batch_limit)) await self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height)) height += self.rpc_batch_limit new_requests += 1 @@ -223,19 +223,20 @@ class Worker: policy.set_event_loop(policy.new_event_loop()) self.loop = asyncio.get_event_loop() self.log = logging.getLogger("Block loader") - self.log.setLevel(logging.INFO) + self.log.setLevel(logging.DEBUG) self.loop.set_default_executor(ThreadPoolExecutor(20)) self.out_writer = out_writer self.in_reader = in_reader - self.coins = MRU(4000000) - self.destroyed_coins = MRU(4000000) - self.a_coins = MRU(4000000) + self.coins = MRU(1000000) + self.destroyed_coins = MRU(1000000) + self.a_coins = MRU(1000000) signal.signal(signal.SIGTERM, self.terminate) self.loop.create_task(self.message_loop()) self.loop.run_forever() - async def load_blocks(self, height): + async def load_blocks(self, height, limit): try: + self.log.critical("%s block loader get from %s to %s" % (self.name, height, limit)) attempt = 10 t = 0 start_height = height @@ -244,7 +245,7 @@ class Worker: while True: batch.append(["getblockhash", height]) h_list.append(height) - if len(batch) >= self.rpc_batch_limit: + if len(batch) >= limit: height += 1 break height += 1 @@ -312,10 +313,14 @@ class Worker: except: pass blocks[x] = pickle.dumps(blocks[x]) + self.log.critical("%s block loader blocks %s" %(self.name, len(blocks[x]))) + self.log.critical("%s block loader checkpoint %s" %(self.name, blocks[x]["checkpoint"])) await self.pipe_sent_msg(b'result', pickle.dumps(blocks)) except: - await self.pipe_sent_msg(b'result', pickle.dumps([])) + self.log.critical("load blocks error") self.log.critical(str(traceback.format_exc())) + await self.pipe_sent_msg(b'result', pickle.dumps([])) + async def message_loop(self): try: @@ -328,7 +333,7 @@ class Worker: return if msg_type == b'get': - self.loop.create_task(self.load_blocks(bytes_to_int(msg))) + self.loop.create_task(self.load_blocks(bytes_to_int(msg), self.rpc_batch_limit)) continue if msg_type == b'rpc_batch_limit':