From 9d614842ed493c0fb0a810ebd7061ed0d0d29560 Mon Sep 17 00:00:00 2001 From: 4tochka Date: Tue, 30 Apr 2019 20:21:44 +0400 Subject: [PATCH] connector --- pybtc/connector.py | 605 ++++++++++++++++++++++++++++++++------------- 1 file changed, 438 insertions(+), 167 deletions(-) diff --git a/pybtc/connector.py b/pybtc/connector.py index 41e2d23..dfebef7 100644 --- a/pybtc/connector.py +++ b/pybtc/connector.py @@ -1,8 +1,8 @@ -from pybtc.functions.tools import rh2s +from pybtc.functions.tools import rh2s, s2rh from pybtc.functions.tools import var_int_to_int from pybtc.functions.tools import read_var_int -from pybtc.functions.tools import bytes_from_hex from pybtc.transaction import Transaction +from pybtc import int_to_c_int, c_int_to_int, c_int_len, int_to_bytes import traceback import aiojsonrpc import zmq @@ -15,13 +15,16 @@ from collections import OrderedDict class Connector: def __init__(self, node_rpc_url, node_zerromq_url, logger, last_block_height=0, chain_tail=None, - mempool_tx_list=None, tx_handler=None, orphan_handler=None, before_block_handler=None, block_handler=None, after_block_handler=None, block_timeout=30, deep_sync_limit=20, backlog=0, mempool_tx=True, rpc_batch_limit=20, rpc_threads_limit=100, rpc_timeout=100, - preload=False): + preload=False, + utxo_data=False, + utxo_cache_size=2000000, + skip_opreturn=True, + postgress_pool=None): self.loop = asyncio.get_event_loop() # settings @@ -31,20 +34,27 @@ class Connector: self.orphan_handler = orphan_handler self.block_timeout = block_timeout self.tx_handler = tx_handler + self.skip_opreturn = skip_opreturn self.before_block_handler = before_block_handler self.block_handler = block_handler self.after_block_handler = after_block_handler self.deep_sync_limit = deep_sync_limit self.backlog = backlog self.mempool_tx = mempool_tx + self.postgress_pool = postgress_pool + self.utxo_cache_size = utxo_cache_size + self.utxo_data = utxo_data self.chain_tail = list(chain_tail) if chain_tail else [] - self.mempool_tx_list = list(mempool_tx_list) if mempool_tx_list else [] self.rpc_timeout = rpc_timeout self.batch_limit = rpc_batch_limit # state and stats self.node_last_block = None - self.last_block_height = int(last_block_height) if int(last_block_height) else 0 + self.utxo = None + self.cache_loading = False + self.app_block_height_on_start = int(last_block_height) if int(last_block_height) else 0 + self.last_block_height = 0 + self.last_block_utxo_cached_height = 0 self.deep_synchronization = False self.block_dependency_tx = 0 # counter of tx that have dependencies in block @@ -58,8 +68,8 @@ class Connector: self.blocks_processed_count = 0 self.blocks_decode_time = 0 self.blocks_download_time = 0 - self.blocks_processing_time = 0 self.total_received_tx_time = 0 + self.start_time = time.time() # cache and system self.preload = preload @@ -71,8 +81,8 @@ class Connector: self.block_txs_request = None self.connected = asyncio.Future() - self.await_tx_list = list() - self.missed_tx_list = list() + self.await_tx = list() + self.missed_tx = list() self.await_tx_future = dict() self.add_tx_future = dict() self.get_missed_tx_threads = 0 @@ -84,6 +94,8 @@ class Connector: asyncio.ensure_future(self.start(), loop=self.loop) async def start(self): + await self.utxo_init() + while True: self.log.info("Connector initialization") try: @@ -114,7 +126,12 @@ class Connector: self.deep_synchronization = True break - [self.tx_cache.set(row, True) for row in self.mempool_tx_list] + if self.utxo_data: + self.utxo = UTXO(self.postgress_pool, + self.loop, + self.log, + self.utxo_cache_size if self.deep_synchronization else 0) + h = self.last_block_height if h < len(self.chain_tail): raise Exception("Chain tail len not match last block height") @@ -125,11 +142,51 @@ class Connector: self.tasks.append(self.loop.create_task(self.zeromq_handler())) self.tasks.append(self.loop.create_task(self.watchdog())) self.connected.set_result(True) - if self.preload: - self.loop.create_task(self.preload_block()) - self.loop.create_task(self.preload_block_hashes()) + # if self.preload: + # self.loop.create_task(self.preload_block()) + # self.loop.create_task(self.preload_block_hashes()) self.loop.create_task(self.get_next_block()) + async def utxo_init(self): + if self.utxo_data: + if self.postgress_pool is None: + raise Exception("UTXO data required postgresql db connection pool") + + async with self.postgress_pool.acquire() as conn: + await conn.execute("""CREATE TABLE IF NOT EXISTS + connector_utxo (outpoint BYTEA, + data BYTEA, + PRIMARY KEY(outpoint)); + """) + await conn.execute("""CREATE TABLE IF NOT EXISTS + connector_utxo_state (name VARCHAR, + value BIGINT, + PRIMARY KEY(name)); + """) + lb = await conn.execute("SELECT value FROM connector_utxo_state WHERE name='last_block';") + lc = await conn.execute("SELECT value FROM connector_utxo_state WHERE name='last_cached_block';") + if lb is None: + lb = 0 + lc = 0 + await conn.execute("INSERT INTO connector_utxo_state (name, value) " + "VALUES ('last_block', 0);") + await conn.execute("INSERT INTO connector_utxo_state (name, value) " + "VALUES ('last_cached_block', 0);") + self.last_block_height = lb + self.last_block_utxo_cached_height = lc + if self.app_block_height_on_start: + if self.app_block_height_on_start < self.last_block_utxo_cached_height: + self.log.critical("UTXO state last block %s app state last block %s " % (self.last_block_height, + self.last_block_utxo_cached_height)) + raise Exception("App blockchain state behind connector blockchain state") + if self.app_block_height_on_start > self.last_block_height: + self.log.warning("Connector utxo height behind App height for %s blocks ..." % + (self.app_block_height_on_start - self.last_block_height,)) + + else: + self.app_block_height_on_start = self.last_block_height + + async def zeromq_handler(self): while True: try: @@ -145,15 +202,19 @@ class Connector: msg = await self.zmqSubSocket.recv_multipart() topic = msg[0] body = msg[1] - if self.deep_synchronization: - continue + if topic == b"hashblock": self.last_zmq_msg = int(time.time()) + if self.deep_synchronization: + continue hash = body.hex() self.log.warning("New block %s" % hash) self.loop.create_task(self._get_block_by_hash(hash)) + elif topic == b"rawtx": self.last_zmq_msg = int(time.time()) + if self.deep_synchronization or not self.mempool_tx: + continue try: self.loop.create_task(self._new_transaction(Transaction(body, format="raw"))) except: @@ -204,43 +265,46 @@ class Connector: self.log.error("watchdog error %s " % err) async def get_next_block(self): - if not self.active: - return - if not self.get_next_block_mutex.done(): - await self.get_next_block_mutex - try: - self.get_next_block_mutex = asyncio.Future() + if self.active: + if not self.get_next_block_mutex.done(): + await self.get_next_block_mutex + try: + self.get_next_block_mutex = asyncio.Future() - if self.node_last_block <= self.last_block_height + self.backlog: - d = await self.rpc.getblockcount() - if d == self.node_last_block: - self.log.info("blockchain is synchronized backlog %s" % self.backlog) - return + if self.node_last_block <= self.last_block_height + self.backlog: + d = await self.rpc.getblockcount() + if d == self.node_last_block: + self.log.info("blockchain is synchronized with backlog %s" % self.backlog) + return + else: + self.node_last_block = d + d = self.node_last_block - self.last_block_height + + if d > self.deep_sync_limit: + if not self.deep_synchronization: + self.log.warning("Deep synchronization mode") + self.deep_synchronization = True else: - self.node_last_block = d - d = self.node_last_block - self.last_block_height - - if d > self.deep_sync_limit: - if not self.deep_synchronization: - self.log.warning("Deep synchronization mode") - self.deep_synchronization = True - else: - if self.deep_synchronization: - self.log.warning("Normal synchronization mode") - self.deep_synchronization = False - h = await self.rpc.getblockhash(self.last_block_height + 1) - await self._get_block_by_hash(h) - except Exception as err: - self.log.error("get next block failed %s" % str(err)) - finally: - self.get_next_block_mutex.set_result(True) + if self.deep_synchronization: + self.log.warning("Normal synchronization mode") + self.deep_synchronization = False + q = time.time() + h = await self.rpc.getblockhash(self.last_block_height + 1) + self.blocks_download_time += time.time() - q + await self._get_block_by_hash(h) + except Exception as err: + self.log.error("get next block failed %s" % str(err)) + finally: + self.get_next_block_mutex.set_result(True) async def _get_block_by_hash(self, hash): self.log.debug("get block by hash %s" % hash) try: block = self.block_hashes_preload.pop(hash) if not block: + q = time.time() block = await self.rpc.getblock(hash) + self.blocks_download_time += time.time() + q self.loop.create_task(self._new_block(block)) except Exception: self.log.error("get block by hash %s FAILED" % hash) @@ -249,111 +313,115 @@ class Connector: if not self.active or not self.active_block.done() or self.last_block_height >= block["height"]: return self.active_block = asyncio.Future() - self.block_dependency_tx = 0 - bin_block_hash = bytes_from_hex(block["hash"]) - bin_prev_block_hash = block["previousblockhash"] if "previousblockhash" in block else None - block_height = int(block["height"]) - self.log.debug("New block %s %s" % (block_height, block["hash"])) - bt = tm() - bpt = 0 + self.log.debug("Block %s %s" % (block["height"], block["hash"])) + bt = time.time() + self.cache_loading = True if self.last_block_height < self.app_block_height_on_start else False + try: - # blockchain position check - if self.block_cache.get(bin_block_hash) is not None: - self.log.debug("duplicated block %s" % block["hash"]) - return - if self.block_cache.get(bin_prev_block_hash) is None and self.last_block_height: - self.log.critical("Connector panic! Node out of sync no parent block in chain tail %s" % bin_prev_block_hash) - return + tx_bin_list = [s2rh(h) for h in block["tx"]] + await self.verify_block_position(block) - if self.last_block_height + 1 != block_height: - if self.orphan_handler: - tq = tm() - await self.orphan_handler(self.last_block_height) - self.log.info("orphan handler %s [%s]" % (self.last_block_height, tm(tq))) - self.block_cache.pop_last() - self.last_block_height -= 1 - return - # add all block transactions - - missed = set() - for h in block["tx"]: - if self.tx_cache.get(h) is None: - missed.add(h) - - if self.before_block_handler: - q = time.time() + if self.before_block_handler and not self.cache_loading: await self.before_block_handler(block) - bpt = time.time() - q - self.blocks_processing_time += bpt - self.log.info("Transactions missed %s" % len(missed)) - cq = tm() - if missed: - self.log.debug("Request missed transactions") - self.missed_tx_list = set(missed) - self.await_tx_list = missed - self.await_tx_future = dict() - for i in missed: - self.await_tx_future[i] = asyncio.Future() - self.block_txs_request = asyncio.Future() - if self.deep_synchronization: - self.loop.create_task(self._get_missed(block["hash"], block["time"], block["height"])) - else: - self.loop.create_task(self._get_missed(False, block["time"], block["height"])) - try: - await asyncio.wait_for(self.block_txs_request, timeout=self.block_timeout) - except asyncio.CancelledError: - # refresh rpc connection session - await self.rpc.close() - self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout) - raise RuntimeError("block transaction request timeout") + await self.fetch_block_transactions(block, tx_bin_list) - tx_count = len(block["tx"]) - self.total_received_tx += tx_count - self.total_received_tx_time += tm(cq) - rate = round(self.total_received_tx/self.total_received_tx_time) - self.log.info("Transactions received: %s [%s] rate tx/s ->> %s <<" % (tx_count, tm(cq), rate)) - - if self.block_handler: - q = time.time() + if self.block_handler and not self.cache_loading: await self.block_handler(block) - self.blocks_processing_time += time.time() - q - bpt += time.time() - q - # insert new block + self.block_cache.set(block["hash"], block["height"]) self.last_block_height = block["height"] + if self.utxo_data: + self.loop.create_task(self.utxo.save_utxo(block["height"])) + self.blocks_processed_count += 1 + + [self.tx_cache.pop(h) for h in tx_bin_list] + + tx_rate = round(self.total_received_tx / (time.time() - self.start_time), 4) + if block["height"] % 200 == 0: + self.log.info("Blocks %s; tx rate: %s;" % (block["height"], tx_rate)) + if self.utxo_data: + loading = "Loading ... " if self.cache_loading else "" + self.log.info(loading + "UTXO %s; hit rate: %s;" % (self.utxo.len(), + self.utxo.hit_rate())) + self.log.info("Blocks download time %s;" % self.blocks_download_time) + self.log.info("Blocks decode time %s;" % self.blocks_decode_time) # after block added handler - if self.after_block_handler: - q = time.time() + if self.after_block_handler and not self.cache_loading: try: await self.after_block_handler(block) except: pass - self.blocks_processing_time += time.time() - q - bpt += time.time() - q - self.blocks_processed_count += 1 - [self.tx_cache.pop(h) for h in block["tx"]] except Exception as err: - if self.await_tx_list: - self.await_tx_list = set() + if self.await_tx: + self.await_tx = set() self.log.error(str(traceback.format_exc())) - self.log.error("new block error %s" % str(err)) + self.log.error("block error %s" % str(err)) finally: self.active_block.set_result(True) - self.log.info("> %s < block [%s tx/ %s size] (dp %s) processing time %s cache [%s/%s]" % + self.log.debug("%s block [%s tx/ %s size] processing time %s cache [%s/%s]" % (block["height"], len(block["tx"]), block["size"] / 1000000, - self.block_dependency_tx, tm(bt), len(self.block_hashes_preload._store), len(self.block_preload._store))) if self.node_last_block > self.last_block_height: self.loop.create_task(self.get_next_block()) + async def fetch_block_transactions(self, block, tx_bin_list): + if not self.deep_synchronization: + missed = set() + for h in tx_bin_list: + if self.tx_cache.get(h) is None: + missed.add(h) + else: + missed = list(tx_bin_list) + self.log.debug("Transactions missed %s" % len(missed)) + + q = time.time() + if missed: + self.missed_tx = set(missed) + self.await_tx = set(missed) + self.await_tx_future = {i: asyncio.Future() for i in missed} + self.block_txs_request = asyncio.Future() + if self.deep_synchronization or self.mempool_tx == False: + self.loop.create_task(self._get_missed(block["hash"], block["time"], block["height"])) + else: + self.loop.create_task(self._get_missed(False, block["time"], block["height"])) + try: + await asyncio.wait_for(self.block_txs_request, timeout=self.block_timeout) + except asyncio.CancelledError: + # refresh rpc connection session + await self.rpc.close() + self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout) + raise RuntimeError("block transaction request timeout") + tx_count = len(block["tx"]) + self.total_received_tx += tx_count + self.total_received_tx_time += time.time() - q + + rate = round(self.total_received_tx/self.total_received_tx_time) + self.log.debug("Transactions received: %s [%s] received tx rate tx/s ->> %s <<" % (tx_count, time.time() - q, rate)) + + async def verify_block_position(self, block): + if self.block_cache.get(block["hash"]) is not None: + self.log.error("duplicated block %s" % block["hash"]) + raise Exception("duplicated block") + + if self.block_cache.get_last() != block["previousblockhash"]: + if self.block_cache.get(block["previousblockhash"]) is None and self.last_block_height: + self.log.critical("Connector error! Node out of sync " + "no parent block in chain tail %s" % block["previousblockhash"]) + raise Exception("Node out of sync") + + if self.orphan_handler: + await self.orphan_handler(self.last_block_height) + self.block_cache.pop_last() + self.last_block_height -= 1 + raise Exception("Sidebranch block removed") async def _get_missed(self, block_hash=False, block_time=None, block_height=None): if block_hash: @@ -369,31 +437,31 @@ class Connector: self.blocks_download_time += dt self.blocks_decode_time += qt - self.log.info("block downloaded %s decoded %s " % (round(dt, 4), round(qt, 4))) + self.log.debug("block downloaded %s decoded %s " % (round(dt, 4), round(qt, 4))) for index, tx in enumerate(block): try: - self.missed_tx_list.remove(rh2s(block[tx]["txId"])) + self.missed_tx.remove(block[tx]["txId"]) self.loop.create_task(self._new_transaction(block[tx], block_time, block_height, index)) except: pass except Exception as err: self.log.error("_get_missed exception %s " % str(err)) self.log.error(str(traceback.format_exc())) - self.await_tx_list = set() + self.await_tx = set() self.block_txs_request.cancel() elif self.get_missed_tx_threads <= self.get_missed_tx_threads_limit: self.get_missed_tx_threads += 1 # start more threads - if len(self.missed_tx_list) > 1: + if len(self.missed_tx) > 1: self.loop.create_task(self._get_missed(False, block_time, block_height)) while True: - if not self.missed_tx_list: + if not self.missed_tx: break try: batch = list() - while self.missed_tx_list: - batch.append(["getrawtransaction", self.missed_tx_list.pop()]) + while self.missed_tx: + batch.append(["getrawtransaction", self.missed_tx.pop()]) if len(batch) >= self.batch_limit: break result = await self.rpc.batch(batch) @@ -407,61 +475,96 @@ class Connector: except Exception as err: self.log.error("_get_missed exception %s " % str(err)) self.log.error(str(traceback.format_exc())) - self.await_tx_list = set() + self.await_tx = set() self.block_txs_request.cancel() self.get_missed_tx_threads -= 1 + async def wait_block_dependences(self, tx): + while self.await_tx_future: + for i in tx["vIn"]: + try: + if not self.await_tx_future[tx["vIn"][i]["txId"]].done(): + await self.await_tx_future[tx["vIn"][i]["txId"]] + break + except: + pass + else: + break + + async def _new_transaction(self, tx, block_time = None, block_height = None, block_index = None): - tx_hash = rh2s(tx["txId"]) - if tx_hash in self.tx_in_process or self.tx_cache.get(tx["txId"]): - return - try: - ft = self.await_tx_future if block_height is not None else None - self.tx_in_process.add(tx_hash) - - if self.tx_handler: - await self.tx_handler(tx, ft, block_time, block_height, block_index) - - self.tx_cache.set(tx_hash, True) + if not(tx["txId"] in self.tx_in_process or self.tx_cache.get(tx["txId"])): try: - self.await_tx_list.remove(tx_hash) - if not self.await_tx_future[tx_hash].done(): - self.await_tx_future[tx_hash].set_result(True) - if not self.await_tx_list: - self.block_txs_request.set_result(True) - except: - pass - except DependsTransaction as err: - self.block_dependency_tx += 1 - self.loop.create_task(self.wait_tx_then_add(err.raw_tx_hash, tx)) - except Exception as err: - if tx_hash in self.await_tx_list: - self.await_tx_list = set() - self.block_txs_request.cancel() - for i in self.await_tx_future: - if not self.await_tx_future[i].done(): - self.await_tx_future[i].cancel() - self.log.debug("new transaction error %s " % err) - self.log.debug(str(traceback.format_exc())) - finally: - self.tx_in_process.remove(tx_hash) + self.tx_in_process.add(tx["txId"]) + if not tx["coinbase"]: + if block_height is not None: + await self.wait_block_dependences(tx) + if self.utxo: + stxo = await self.get_stxo(tx, block_height, block_index) + else: + stxo = None + if self.tx_handler and not self.cache_loading: + await self.tx_handler(tx, stxo, block_time, block_height, block_index) + if self.utxo: + self.put_utxo(tx, block_height, block_index) + self.tx_cache.set(tx["txId"], True) + try: + self.await_tx.remove(tx["txId"]) + if not self.await_tx_future[tx["txId"]].done(): + self.await_tx_future[tx["txId"]].set_result(True) + if not self.await_tx: + self.block_txs_request.set_result(True) + except: + pass + except Exception as err: + if tx["txId"] in self.await_tx: + self.await_tx = set() + self.block_txs_request.cancel() + for i in self.await_tx_future: + if not self.await_tx_future[i].done(): + self.await_tx_future[i].cancel() + self.log.debug("new transaction error %s " % err) + self.log.debug(str(traceback.format_exc())) + finally: + self.tx_in_process.remove(tx["txId"]) + def put_utxo(self, tx, block_height, block_index): + for i in tx["vOut"]: + out = tx["vOut"][i] + if self.skip_opreturn and out["nType"] in (3, 8): + continue + pointer = (block_height << 42) + (block_index << 21) + i + if "addressHash" not in out: + address = out["scriptPubKey"] + else: + address = b"%s%s" % (bytes([out["nType"]]), out["addressHash"]) + outpoint = b"%s%s" % (tx["txId"], int_to_bytes(i)) + self.utxo.set(outpoint, pointer, out["value"], address) - async def wait_tx_then_add(self, raw_tx_hash, tx): - tx_hash = rh2s(tx["hash"]) - try: - if not self.await_tx_future[raw_tx_hash].done(): - await self.await_tx_future[raw_tx_hash] - self.loop.create_task(self._new_transaction(tx)) - except: - self.tx_in_process.remove(tx_hash) + async def get_stxo(self, tx, block_height, block_index): + stxo, missed = set(), set() + block_height = 0 if block_height is None else block_height + block_index = 0 if block_index is None else block_index + for i in tx["vIn"]: + inp = tx["vIn"][i] + outpoint = b"%s%s" % (inp["txId"], int_to_bytes(inp["vOut"])) + r = self.utxo.get(outpoint) + stxo.add(r) if r else missed.add((outpoint, (block_height << 42) + (block_index << 21) + i)) + if missed: + await self.utxo.load_utxo() + [stxo.add(self.utxo.get_loaded(o, block_height)) for o, s in missed] + if len(stxo) != len(tx["vIn"]): + self.log.critical("utxo get failed " + rh2s(tx["txId"])) + self.log.critical(str(stxo)) + raise Exception("utxo get failed ") + return stxo async def preload_block_hashes(self): @@ -527,7 +630,167 @@ class Connector: self.log.warning('Node connector terminated') +class UTXO(): + def __init__(self, db_pool, loop, log, cache_size): + self.cached = OrderedDict() + self.missed = set() + self.destroyed = OrderedDict() + self.log = log + self.loaded = OrderedDict() + self.maturity = 100 + self._cache_size = cache_size + self._db_pool = db_pool + self.loop = loop + self.clear_tail = False + self.last_saved_block = 0 + self.last_cached_block = 0 + self.save_process = False + self.load_utxo_future = asyncio.Future() + self.load_utxo_future.set_result(True) + self._requests = 0 + self._failed_requests = 0 + self._hit = 0 + def set(self, outpoint, pointer, amount, address): + if len(self.cached) > self._cache_size and not self.save_process: + self.loop.create_task(self.save_utxo()) + self.cached[outpoint] = (pointer, amount, address) + if pointer: + self.last_cached_block = pointer >> 42 + + def remove(self, outpoint): + del self.cached[outpoint] + + async def save_utxo(self, block_height): + # save to db tail from cache + block_height -= self.maturity + if block_height > 0 and not self.save_process: + c = len(self.cached) - self._cache_size + try: + self.save_process = True + + for key in iter(self.destroyed): + if key < block_height: + n = set() + for i in self.destroyed[key]: + try: + del self.cached[i] + except: + try: + del self.loaded[i] + n.add(i) + except: + pass + self.destroyed[key] = n + + + ln, rs, lb = set(), set(), 0 + for key in iter(self.cached): + i = self.cached[key] + if (c>0 or lb == i[0] >> 42) and (i[0] >> 42) < block_height: + rs.add((key,b"".join((int_to_c_int(i[0]), + int_to_c_int(i[1]), + i[2])))) + ln.add(key) + lb = i[0] >> 42 + c -= 1 + continue + break + + + r = set() + db = set() + for key in iter(self.destroyed): + if key <= lb and key < block_height: + db.add(key) + [r.add(i) for i in self.destroyed[key]] + + # insert to db + async with self._db_pool.acquire() as conn: + async with conn.transaction(): + await conn.execute("DELETE FROM connector_utxo WHERE " + "outpoint = ANY($1);", r) + await conn.copy_records_to_table('connector_utxo', columns=["outpoint", "data"], records=rs) + await conn.execute("UPDATE connector_utxo_state SET value = %s " + "WHERE name = 'last_block';", lb) + await conn.execute("UPDATE connector_utxo_state SET value = %s " + "WHERE name = 'last_cached_block';", block_height) + + # remove from cache + for key in ln: + try: + self.cached.pop(key) + except: + pass + + [self.destroyed.pop(key) for key in db] + self.last_saved_block = lb + finally: + self.save_process = False + + def get(self, key, block_height): + self._requests += 1 + try: + i = self.cached[key] + try: + self.destroyed[block_height].add(key) + except: + self.destroyed[block_height] = {key} + self._hit += 1 + return i + except: + self._failed_requests += 1 + self.missed.add(key) + return None + + def get_loaded(self, key, block_height): + try: + i = self.loaded[key] + try: + self.destroyed[block_height].add(key) + except: + self.destroyed[block_height] = {key} + return i + except: + return None + + async def load_utxo(self): + while True: + if not self.load_utxo_future.done(): + await self.load_utxo_future + continue + break + try: + self.load_utxo_future = asyncio.Future() + l = set(self.missed) + async with self._db_pool.acquire() as conn: + rows = await conn.fetch("SELECT outpoint, utxo.data FROM connector_utxo " + "WHERE outpoint = ANY($1);", l) + for i in l: + try: + self.missed.remove(i) + except: + pass + for row in rows: + d = row["data"] + pointer = c_int_to_int(d) + f = c_int_len(pointer) + amount = c_int_to_int(d[f:]) + f += c_int_len(amount) + address = d[f:] + self.loaded[row["outpoint"]] = (pointer, amount, address) + finally: + self.load_utxo_future.set_result(True) + + + def len(self): + return len(self.cached) + + def hit_rate(self): + if self._requests: + return self._hit / self._requests + else: + return 0 @@ -602,6 +865,14 @@ class Cache(): except: return None + def get_last(self): + try: + i = next(reversed(self._store)) + data = self._store[i] + return data + except: + return None + def len(self): return len(self._store)