diff --git a/pybtc/connector/utxo.py b/pybtc/connector/utxo.py index db102cb..3fe92d6 100644 --- a/pybtc/connector/utxo.py +++ b/pybtc/connector/utxo.py @@ -19,8 +19,7 @@ class UTXO(): self.clear_tail = False self.last_saved_block = 0 self.last_cached_block = 0 - self.save_future = asyncio.Future() - self.save_future.set_result(True) + self.save_process = False self.load_utxo_future = asyncio.Future() self.load_utxo_future.set_result(True) self._requests = 0 @@ -60,71 +59,83 @@ class UTXO(): # save to db tail from cache self.log.critical("save utxo>>>>") return - if not self.save_future.done(): - await self.save_future.done() + if self.save_process or not self.cached: return - self.save_future = asyncio.Future() - while True: - c = len(self.cached) - self._cache_soft_limit - self.block_txo_max - if c <= 0: break - self.log.critical("str>>>>") - try: - lb = 0 - for key in iter(self.cached): - i = self.cached[key] - if c>0 and (i[0] >> 42) <= block_height: - c -= 1 - lb = i[0] >> 42 - continue - break + self.save_process = True + try: + lb = 0 + block_changed = False + utxo = set() + while self.cached: + i = self.cached.pop() + if lb != i[1][0] >> 42: + block_changed = True + lb = i[1][0] >> 42 + if self.cached <= self.size_limit: + if block_changed: + break + utxo.add((i[0],b"".join((int_to_c_int(i[1][0]), + int_to_c_int(i[1][1]), + i[1][2])))) + if block_changed: + self.cached.append({i[0]: i[1]}) - if lb: - d = set() - for key in range(self.last_saved_block + 1, lb + 1): - try: - [d.add(i) for i in self.deleted[key]] - except: - pass + # + # block_height + # for key in iter(self.cached): + # i = self.cached[key] + # if c>0 and (i[0] >> 42) <= block_height: + # c -= 1 + # lb = i[0] >> 42 + # continue + # break + # + # if lb: + # d = set() + # for key in range(self.last_saved_block + 1, lb + 1): + # try: + # [d.add(i) for i in self.deleted[key]] + # except: + # pass + # + # a = set() + # for key in iter(self.cached): + # i = self.cached[key] + # if (i[0] >> 42) > lb: break + # a.add((key,b"".join((int_to_c_int(i[0]), + # int_to_c_int(i[1]), + # i[2])))) - a = set() - for key in iter(self.cached): - i = self.cached[key] - if (i[0] >> 42) > lb: break - a.add((key,b"".join((int_to_c_int(i[0]), - int_to_c_int(i[1]), - i[2])))) + # insert to db + d = set() + async with self._db_pool.acquire() as conn: + async with conn.transaction(): + if d: + await conn.execute("DELETE FROM connector_utxo WHERE " + "outpoint = ANY($1);", d) + if a: + await conn.copy_records_to_table('connector_utxo', + columns=["outpoint", "data"], records=utxo) + await conn.execute("UPDATE connector_utxo_state SET value = $1 " + "WHERE name = 'last_block';", lb) + self.saved_utxo += len(utxo) + self.deleted_utxo += len(d) - # insert to db - async with self._db_pool.acquire() as conn: - async with conn.transaction(): - if d: - await conn.execute("DELETE FROM connector_utxo WHERE " - "outpoint = ANY($1);", d) - if a: - await conn.copy_records_to_table('connector_utxo', - columns=["outpoint", "data"], records=a) - await conn.execute("UPDATE connector_utxo_state SET value = $1 " - "WHERE name = 'last_block';", lb) - await conn.execute("UPDATE connector_utxo_state SET value = $1 " - "WHERE name = 'last_cached_block';", block_height) - self.saved_utxo += len(a) - self.deleted_utxo += len(d) - - # remove from cache - for key in a: - try: - self.cached.pop(key[0]) - except: - pass - - for key in range(self.last_saved_block + 1, lb + 1): - try: - self.deleted.pop(key) - except: - pass - self.last_saved_block = lb - finally: - self.save_future = False + # # remove from cache + # for key in a: + # try: + # self.cached.pop(key[0]) + # except: + # pass + # + # for key in range(self.last_saved_block + 1, lb + 1): + # try: + # self.deleted.pop(key) + # except: + # pass + self.last_saved_block = lb + finally: + self.save_future = False def get(self, key): self._requests += 1