From 1f7334718ca63c85a897cc979a0744a98d84ec3b Mon Sep 17 00:00:00 2001 From: 4tochka Date: Tue, 7 May 2019 17:54:00 +0400 Subject: [PATCH] connector --- pybtc/connector/block_loader.py | 126 +++++++++++++++++++++++++++----- pybtc/connector/connector.py | 90 ++--------------------- 2 files changed, 114 insertions(+), 102 deletions(-) diff --git a/pybtc/connector/block_loader.py b/pybtc/connector/block_loader.py index 588b725..804f97b 100644 --- a/pybtc/connector/block_loader.py +++ b/pybtc/connector/block_loader.py @@ -1,6 +1,7 @@ import asyncio import os from multiprocessing import Process +from pybtc.functions.tools import int_to_bytes, bytes_to_int from concurrent.futures import ThreadPoolExecutor from setproctitle import setproctitle import logging @@ -9,18 +10,68 @@ import sys import aiojsonrpc import traceback import pickle + class BlockLoader: def __init__(self, parent, workers=4): self.worker = dict() self.worker_busy = dict() + self.parent = parent + self.loading_task = None self.log = parent.log self.loop = parent.loop self.rpc_url = parent.rpc_url self.rpc_timeout = parent.rpc_timeout self.rpc_batch_limit = parent.rpc_batch_limit self.loop.set_default_executor(ThreadPoolExecutor(workers * 2)) + self.watchdog_task = self.loop.create_task(self.watchdog()) [self.loop.create_task(self.start_worker(i)) for i in range(workers)] + async def watchdog(self): + while True: + try: + if self.loading_task is None or self.loading_task.done(): + if self.parent.deep_synchronization: + self.loading_task = self.loop.create_task(self.loading()) + else: + pass + # clear tail + + except asyncio.CancelledError: + self.log.info("connector watchdog terminated") + break + except Exception as err: + self.log.error(str(traceback.format_exc())) + self.log.error("watchdog error %s " % err) + await asyncio.sleep(60) + + + async def loading(self): + target_height = self.parent.node_last_block - self.parent.self.deep_sync_limit + height = self.parent.last_block_height + 1 + while height < target_height: + new_requests = 0 + if self.parent.block_preload._store_size < self.parent.block_preload_cache_limit: + try: + if height <= self.parent.last_block_height: + height = self.parent.last_block_height + 1 + for i in self.worker_busy: + if not self.worker_busy[i]: + self.worker_busy[i] = True + self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height)) + height += self.rpc_batch_limit + new_requests += 1 + if not new_requests: + await asyncio.sleep(1) + except asyncio.CancelledError: + self.log.info("Loading task terminated") + break + except Exception as err: + self.log.error("Loading task error %s " % err) + else: + await asyncio.sleep(1) + + + async def start_worker(self,index): self.log.warning('Start block loader worker %s' % index) # prepare pipes for communications @@ -48,21 +99,6 @@ class BlockLoader: del self.worker[index] self.log.warning('Block loader worker %s is stopped' % index) - async def load_blocks(self, batch): - while True: - for i in self.worker_busy: - if not self.worker_busy[i]: - self.worker_busy[i] = True - try: - self.log.warning("<<<<<") - self.pipe_sent_msg(self.worker[i].writer, b'get', pickle.dumps(batch)) - self.log.warning("ok<") - except: - self.log.warning(str(traceback.format_exc())) - finally: - self.worker_busy[i] = False - return None - await asyncio.sleep(1) async def get_pipe_reader(self, fd_reader): @@ -147,20 +183,16 @@ class Worker: self.loop.run_forever() async def message_loop(self): - self.log.critical("xxx") try: self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout) self.reader = await self.get_pipe_reader(self.in_reader) - self.log.critical("reader") while True: - self.log.critical("get pos") msg_type, msg = await self.pipe_get_msg(self.reader) - self.log.critical(str(len(msg))) if msg_type == b'pipe_read_error': return if msg_type == b'get': - self.log.critical(str(len(msg))) + self.log.critical(str(bytes_to_int(msg))) continue except: self.log.critical("exc") @@ -208,4 +240,58 @@ class Worker: +""" + batch = list() + h_list = list() + while True: + batch.append(["getblockhash", height]) + h_list.append(height) + if len(batch) >= self.rpc_batch_limit or height >= max_height: + height += 1 + break + height += 1 + result = await self.rpc.batch(batch) + h = list() + batch = list() + for lh, r in zip(h_list, result): + try: + self.block_hashes.set(lh, r["result"]) + batch.append(["getblock", r["result"], 0]) + h.append(lh) + except: + pass + self.log.critical(">>>") + blocks = await self.block_loader.load_blocks(batch) + + for x,y in zip(h,blocks): + try: + self.block_preload.set(x, y) + except: + pass + except asyncio.CancelledError: + self.log.info("connector preload_block_hashes failed") + break + except: + pass + + if processed_height < self.last_block_height: + for i in range(processed_height, self.last_block_height ): + try: + self.block_preload.remove(i) + except: + pass + processed_height = self.last_block_height + if self.block_preload._store and next(iter(self.block_preload._store)) < processed_height + 1: + for i in range(next(iter(self.block_preload._store)), self.last_block_height+1): + try: + self.block_preload.remove(i) + except: + pass + if self.block_preload._store_size < self.block_preload_cache_limit * 0.9: + continue + + await asyncio.sleep(10) + # remove unused items + +""" diff --git a/pybtc/connector/connector.py b/pybtc/connector/connector.py index d26277a..d578560 100644 --- a/pybtc/connector/connector.py +++ b/pybtc/connector/connector.py @@ -12,7 +12,7 @@ import zmq import zmq.asyncio import asyncio import time - +import pickle class Connector: def __init__(self, node_rpc_url, node_zerromq_url, logger, @@ -282,7 +282,7 @@ class Connector: if self.node_last_block <= self.last_block_height + self.backlog: d = await self.rpc.getblockcount() if d == self.node_last_block: - self.log.info("blockchain is synchronized with backlog %s" % self.backlog) + self.log.info("Blockchain is synchronized with backlog %s" % self.backlog) return else: self.node_last_block = d @@ -297,21 +297,16 @@ class Connector: self.log.warning("Normal synchronization mode") # clear preload caches self.deep_synchronization = False - + block = None if self.deep_synchronization: - block = self.block_preload.pop(self.last_block_height + 1) - if not block: - h = self.block_hashes.pop(self.last_block_height + 1) - if h is None: - h = await self.rpc.getblockhash(self.last_block_height + 1) - if not self.block_hashes_preload_mutex: - self.loop.create_task(self.preload_blocks()) - block = await self._get_block_by_hash(h) - else: + raw_block = self.block_preload.pop(self.last_block_height + 1) + if raw_block: + block = pickle.loads(raw_block) + + if not block: h = await self.rpc.getblockhash(self.last_block_height + 1) block = await self._get_block_by_hash(h) - self.loop.create_task(self._new_block(block)) except Exception as err: self.log.error("get next block failed %s" % str(err)) @@ -656,75 +651,6 @@ class Connector: return stxo - async def preload_blocks(self): - if self.block_hashes_preload_mutex: - return - try: - self.block_hashes_preload_mutex = True - max_height = self.node_last_block - self.deep_synchronization - height = self.last_block_height + 1 - processed_height = self.last_block_height - - while height < max_height: - if self.block_preload._store_size < self.block_preload_cache_limit: - try: - if height < self.last_block_height: - height = self.last_block_height + 1 - batch = list() - h_list = list() - while True: - batch.append(["getblockhash", height]) - h_list.append(height) - if len(batch) >= self.rpc_batch_limit or height >= max_height: - height += 1 - break - height += 1 - result = await self.rpc.batch(batch) - h = list() - batch = list() - for lh, r in zip(h_list, result): - try: - self.block_hashes.set(lh, r["result"]) - batch.append(["getblock", r["result"], 0]) - h.append(lh) - except: - pass - self.log.critical(">>>") - blocks = await self.block_loader.load_blocks(batch) - - for x,y in zip(h,blocks): - try: - self.block_preload.set(x, y) - except: - pass - except asyncio.CancelledError: - self.log.info("connector preload_block_hashes failed") - break - except: - pass - - if processed_height < self.last_block_height: - for i in range(processed_height, self.last_block_height ): - try: - self.block_preload.remove(i) - except: - pass - processed_height = self.last_block_height - if self.block_preload._store and next(iter(self.block_preload._store)) < processed_height + 1: - for i in range(next(iter(self.block_preload._store)), self.last_block_height+1): - try: - self.block_preload.remove(i) - except: - pass - if self.block_preload._store_size < self.block_preload_cache_limit * 0.9: - continue - - await asyncio.sleep(10) - # remove unused items - - finally: - self.block_hashes_preload_mutex = False - async def stop(self): self.active = False