diff --git a/pybtc/connector.py b/pybtc/connector.py index eb9d0a8..4a6b271 100644 --- a/pybtc/connector.py +++ b/pybtc/connector.py @@ -413,6 +413,12 @@ class Connector: self.utxo.destroyed_utxo, self.utxo.destroyed_utxo_block )) + self.log.info( + "destroyed utxo %s; " + "destroyed utxo block %s;" % ( + self.utxo.destroyed_utxo, + self.utxo.destroyed_utxo_block + )) # after block added handler if self.after_block_handler and not self.cache_loading: @@ -770,13 +776,13 @@ class UTXO(): pass self.deleted[key] = n [self.destroyed.pop(i) for i in k] + if len(self.cached) - self._cache_size > 0: + self.loop.create_task(self.save_utxo(block_height)) async def save_utxo(self, block_height): - return # save to db tail from cache - block_height -= self.maturity c = len(self.cached) - self._cache_size - if block_height > 0 and not self.save_process and c > 0: + if not self.save_process: try: self.save_process = True lb = 0 @@ -789,73 +795,50 @@ class UTXO(): c -= 1 continue break - if lb: - - for key in self.destroyed: - if key < lb: - n = set() - for outpoint in self.destroyed[key]: - try: - del self.cached[outpoint] - self.destroyed_utxo += 1 - except: - try: - del self.loaded[outpoint] - n.add(outpoint) - except: - pass - self.destroyed[key] = n - self.log.critical(str(key)) - - - - for key in iter(self.cached): - i = self.cached[key] - if i[0] >> 42 <= lb: - rs.add((key,b"".join((int_to_c_int(i[0]), - int_to_c_int(i[1]), - i[2])))) - ln.add(key) - lb = i[0] >> 42 - c -= 1 - continue - break - # if not lb: - # await asyncio.sleep(0) - # return - - - for key in iter(self.destroyed): - if key <= lb and key < block_height: - db.add(key) - [r.add(i) for i in self.destroyed[key]] - - # insert to db - async with self._db_pool.acquire() as conn: - async with conn.transaction(): - if r: - await conn.execute("DELETE FROM connector_utxo WHERE " - "outpoint = ANY($1);", r) - if rs: - await conn.copy_records_to_table('connector_utxo', columns=["outpoint", "data"], records=rs) - if lb: - await conn.execute("UPDATE connector_utxo_state SET value = $1 " - "WHERE name = 'last_block';", lb) - await conn.execute("UPDATE connector_utxo_state SET value = $1 " - "WHERE name = 'last_cached_block';", block_height) - self.saved_utxo += len(rs) - self.deleted_utxo += len(r) if lb: - # remove from cache - for key in ln: + d = set() + for key in range(self.last_saved_block + 1, lb + 1): try: - self.cached.pop(key) + [d.add(i) for i in self.deleted[key]] + except: + pass + + a = set() + for key in range(self.last_saved_block + 1, lb + 1): + i = self.cached[key] + a.add((key,b"".join((int_to_c_int(i[0]), + int_to_c_int(i[1]), + i[2])))) + + # insert to db + async with self._db_pool.acquire() as conn: + async with conn.transaction(): + if d: + await conn.execute("DELETE FROM connector_utxo WHERE " + "outpoint = ANY($1);", d) + if a: + await conn.copy_records_to_table('connector_utxo', + columns=["outpoint", "data"], records=a) + await conn.execute("UPDATE connector_utxo_state SET value = $1 " + "WHERE name = 'last_block';", lb) + await conn.execute("UPDATE connector_utxo_state SET value = $1 " + "WHERE name = 'last_cached_block';", block_height) + self.saved_utxo += len(a) + self.deleted_utxo += len(d) + + # remove from cache + for key in a: + try: + self.cached.pop(key[0]) + except: + pass + + for key in range(self.last_saved_block + 1, lb + 1): + try: + self.deleted.pop(key) except: pass - for key in self.destroyed: - if not self.destroyed[key]: - self.destroyed.pop(key) self.last_saved_block = lb finally: self.save_process = False