From 9be276e20f08225d6c816f5f6cc2d88bfe56de0b Mon Sep 17 00:00:00 2001 From: 4tochka Date: Sun, 19 May 2019 11:32:09 +0400 Subject: [PATCH] connector --- pybtc/connector/block_loader.py | 3 +-- pybtc/connector/connector.py | 35 ++++++++++++++++------------- pybtc/connector/utxo.py | 40 ++++++++++++++++++++------------- 3 files changed, 44 insertions(+), 34 deletions(-) diff --git a/pybtc/connector/block_loader.py b/pybtc/connector/block_loader.py index c1633de..b09a053 100644 --- a/pybtc/connector/block_loader.py +++ b/pybtc/connector/block_loader.py @@ -44,13 +44,12 @@ class BlockLoader: else: # clear unused cache if self.parent.block_preload._store: - self.log.warning(">>>" + str(next(iter(self.parent.block_preload._store)))) if next(iter(self.parent.block_preload._store)) <= self.parent.last_block_height: for i in range(next(iter(self.parent.block_preload._store)), self.parent.last_block_height + 1): try: self.parent.block_preload.remove(i) - except: self.log.warning(str(traceback.format_exc())) + except: pass except asyncio.CancelledError: self.log.info("connector watchdog terminated") diff --git a/pybtc/connector/connector.py b/pybtc/connector/connector.py index 43dc29d..eb58001 100644 --- a/pybtc/connector/connector.py +++ b/pybtc/connector/connector.py @@ -361,8 +361,9 @@ class Connector: self.cache_loading = True if self.last_block_height < self.app_block_height_on_start else False - if self.deep_synchronization: - tx_bin_list = [block["rawTx"][i]["txId"] for i in block["rawTx"]] + if not self.deep_synchronization: + if not self.block_batch_handler: + tx_bin_list = [block["rawTx"][i]["txId"] for i in block["rawTx"]] else: tx_bin_list = [s2rh(h) for h in block["tx"]] await self.verify_block_position(block) @@ -374,27 +375,32 @@ class Connector: else: await self.fetch_block_transactions(block, tx_bin_list) - if self.block_handler and not self.cache_loading: - await self.block_handler(block) - - self.block_headers_cache.set(block["hash"], block["height"]) - self.last_block_height = block["height"] if self.utxo_data: + checkpoint = self.utxo.checkpoint try: self.utxo.checkpoints.append(block["checkpoint"]) except: pass if len(self.utxo.cached) > self.utxo.size_limit and \ not self.utxo.save_process and \ self.utxo.checkpoints: - # n = list() - # for i in self.utxo.checkpoints: - # if i >= block["height"]: - # n.append(i) - # self.utxo.checkpoints = n if self.utxo.checkpoints[0] < block["height"]: self.utxo.deleted_last_block = block["height"] self.utxo.pending_deleted = self.utxo.pending_deleted | self.utxo.deleted self.utxo.deleted = set() - self.loop.create_task(self.utxo.save_utxo()) + self.utxo.create_checkpoint() + else: + checkpoint = None + + if self.block_batch_handler and not self.cache_loading: + await self.block_batch_handler(block, checkpoint) + + if self.block_handler and not self.cache_loading: + await self.block_handler(block) + + self.block_headers_cache.set(block["hash"], block["height"]) + self.last_block_height = block["height"] + + if self.utxo_data and self.utxo.save_process: + self.loop.create_task(self.utxo.save_utxo()) self.blocks_processed_count += 1 @@ -583,9 +589,6 @@ class Connector: self.log.critical("utxo get failed " + rh2s(block["hash"])) raise Exception("utxo get failed ") - - if self.block_batch_handler and not self.cache_loading: - await self.block_batch_handler(block) self.total_received_tx += len(block["rawTx"]) except Exception as err: self.log.critical("new block error %s " % err) diff --git a/pybtc/connector/utxo.py b/pybtc/connector/utxo.py index 53db59e..eca7ef2 100644 --- a/pybtc/connector/utxo.py +++ b/pybtc/connector/utxo.py @@ -2,6 +2,7 @@ from pybtc import int_to_c_int, c_int_to_int, c_int_len import asyncio from collections import OrderedDict from pybtc import MRU +import traceback class UTXO(): def __init__(self, db_pool, loop, log, cache_size): @@ -9,6 +10,7 @@ class UTXO(): self.missed = set() self.deleted = set() self.pending_deleted = set() + self.pending_utxo = set() self.checkpoints = list() self.log = log self.loaded = MRU() @@ -43,21 +45,19 @@ class UTXO(): del self.cached[outpoint] - async def save_utxo(self): + def create_checkpoint(self): # save to db tail from cache if self.save_process or not self.cached: return if not self.checkpoints: return - self.save_process = True try: - # self.log.critical("cached " + str(len(self.cached)) ) i = self.cached.peek_last_item() self.checkpoints = sorted(self.checkpoints) checkpoint = self.checkpoints.pop(0) lb = 0 block_changed = False checkpoint_found = False - utxo = set() + while self.cached: i = self.cached.pop() if lb != i[1][0] >> 42: @@ -77,19 +77,26 @@ class UTXO(): if len(self.cached) <= self.size_limit: if block_changed and checkpoint_found: break - utxo.add((i[0],b"".join((int_to_c_int(i[1][0]), + self.pending_utxo.add((i[0],b"".join((int_to_c_int(i[1][0]), int_to_c_int(i[1][1]), i[1][2])))) self.pending_saved[i[0]] = i[1] if block_changed: self.cached.append({i[0]: i[1]}) lb -= 1 - if not checkpoint_found: - for i in reversed(self.pending_saved): - self.cached.append({i: self.pending_saved[i]}) - self.log.critical("checkpoint not found " +str(lb) +" > "+ str(self.checkpoints)) - await asyncio.sleep(5) - return + + self.checkpoint = lb if checkpoint_found else None + + except: + self.log.critical("create checkpoint error") + self.log.critical(str(traceback.format_exc())) + + + async def save_checkpoint(self): + # save to db tail from cache + if not self.checkpoint: return + try: + if not self.checkpoint: return async with self._db_pool.acquire() as conn: @@ -97,21 +104,22 @@ class UTXO(): if self.pending_deleted: await conn.execute("DELETE FROM connector_utxo WHERE " "outpoint = ANY($1);", self.pending_deleted) - if utxo: + if self.pending_utxo: await conn.copy_records_to_table('connector_utxo', - columns=["outpoint", "data"], records=utxo) + columns=["outpoint", "data"], records=self.pending_utxo) await conn.execute("UPDATE connector_utxo_state SET value = $1 " "WHERE name = 'last_block';", lb) await conn.execute("UPDATE connector_utxo_state SET value = $1 " "WHERE name = 'last_cached_block';", self.deleted_last_block) - self.saved_utxo += len(utxo) + self.saved_utxo += len(self.pending_utxo) self.deleted_utxo += len(self.pending_deleted) self.pending_deleted = set() + self.pending_utxo = set() - self.last_saved_block = lb + self.last_saved_block = self.checkpoint + self.checkpoint = None except: - import traceback self.log.critical("implement rollback ") self.log.critical(str(traceback.format_exc())) finally: