diff --git a/pybtc/connector.py b/pybtc/connector.py index 4a6b271..e0487c3 100644 --- a/pybtc/connector.py +++ b/pybtc/connector.py @@ -744,6 +744,7 @@ class UTXO(): self._hit = 0 self.saved_utxo = 0 self.deleted_utxo = 0 + self.deleted_utxo_saved = 0 self.loaded_utxo = 0 self.destroyed_utxo = 0 self.destroyed_utxo_block = 0 @@ -758,90 +759,87 @@ class UTXO(): def destroy_utxo(self, block_height): block_height -= self.maturity - self.destroyed_utxo_block = block_height - k = set() - for key in self.destroyed: - if key < block_height: - k.add(key) - n = set() - for outpoint in self.destroyed[key]: + + for key in range(self.destroyed_utxo_block + 1, block_height + 1): + if key not in self.destroyed: continue + n = set() + for outpoint in self.destroyed[key]: + try: + del self.cached[outpoint] + self.destroyed_utxo += 1 + except: try: - del self.cached[outpoint] - self.destroyed_utxo += 1 + del self.loaded[outpoint] + n.add(outpoint) except: - try: - del self.loaded[outpoint] - n.add(outpoint) - except: - pass - self.deleted[key] = n - [self.destroyed.pop(i) for i in k] - if len(self.cached) - self._cache_size > 0: + pass + self.deleted[key] = n + self.destroyed.pop(key) + + self.destroyed_utxo_block = block_height + if len(self.cached) - self._cache_size > 0 and not self.save_process: self.loop.create_task(self.save_utxo(block_height)) async def save_utxo(self, block_height): # save to db tail from cache + self.save_process = True c = len(self.cached) - self._cache_size - if not self.save_process: - try: - self.save_process = True - lb = 0 - ln, rs = set(), set() - r = set() - db = set() + try: + lb = 0 + for key in iter(self.cached): + i = self.cached[key] + if c>0 and (i[0] >> 42) <= block_height: + c -= 1 + continue + break + + if lb: + d = set() + for key in range(self.last_saved_block + 1, lb + 1): + try: + [d.add(i) for i in self.deleted[key]] + except: + pass + + a = set() for key in iter(self.cached): i = self.cached[key] - if (c>0 or lb == i[0] >> 42) and (i[0] >> 42) < block_height: - c -= 1 - continue - break + if (i[0] >> 42) > lb: break + a.add((key,b"".join((int_to_c_int(i[0]), + int_to_c_int(i[1]), + i[2])))) - if lb: - d = set() - for key in range(self.last_saved_block + 1, lb + 1): - try: - [d.add(i) for i in self.deleted[key]] - except: - pass + # insert to db + async with self._db_pool.acquire() as conn: + async with conn.transaction(): + if d: + await conn.execute("DELETE FROM connector_utxo WHERE " + "outpoint = ANY($1);", d) + if a: + await conn.copy_records_to_table('connector_utxo', + columns=["outpoint", "data"], records=a) + await conn.execute("UPDATE connector_utxo_state SET value = $1 " + "WHERE name = 'last_block';", lb) + await conn.execute("UPDATE connector_utxo_state SET value = $1 " + "WHERE name = 'last_cached_block';", block_height) + self.saved_utxo += len(a) + self.deleted_utxo += len(d) - a = set() - for key in range(self.last_saved_block + 1, lb + 1): - i = self.cached[key] - a.add((key,b"".join((int_to_c_int(i[0]), - int_to_c_int(i[1]), - i[2])))) + # remove from cache + for key in a: + try: + self.cached.pop(key[0]) + except: + pass - # insert to db - async with self._db_pool.acquire() as conn: - async with conn.transaction(): - if d: - await conn.execute("DELETE FROM connector_utxo WHERE " - "outpoint = ANY($1);", d) - if a: - await conn.copy_records_to_table('connector_utxo', - columns=["outpoint", "data"], records=a) - await conn.execute("UPDATE connector_utxo_state SET value = $1 " - "WHERE name = 'last_block';", lb) - await conn.execute("UPDATE connector_utxo_state SET value = $1 " - "WHERE name = 'last_cached_block';", block_height) - self.saved_utxo += len(a) - self.deleted_utxo += len(d) - - # remove from cache - for key in a: - try: - self.cached.pop(key[0]) - except: - pass - - for key in range(self.last_saved_block + 1, lb + 1): - try: - self.deleted.pop(key) - except: - pass - self.last_saved_block = lb - finally: - self.save_process = False + for key in range(self.last_saved_block + 1, lb + 1): + try: + self.deleted.pop(key) + except: + pass + self.last_saved_block = lb + finally: + self.save_process = False def get(self, key, block_height): self._requests += 1