diff --git a/pybtc/connector/connector.py b/pybtc/connector/connector.py index b615b8f..2fae7c1 100644 --- a/pybtc/connector/connector.py +++ b/pybtc/connector/connector.py @@ -28,6 +28,7 @@ class Connector: skip_opreturn=True, block_preload_cache_limit= 1000 * 1000000, block_hashes_cache_limit= 200 * 1000000, + db_type=None, db=None): self.loop = asyncio.get_event_loop() @@ -48,6 +49,7 @@ class Connector: self.deep_sync_limit = deep_sync_limit self.backlog = backlog self.mempool_tx = mempool_tx + self.db_type = db_type self.db = db self.utxo_cache_size = utxo_cache_size self.utxo_data = utxo_data @@ -145,9 +147,8 @@ class Connector: break if self.utxo_data: - self.utxo = UTXO(self.db, - self.loop, - self.log, + self.utxo = UTXO(self.db_type, self.db, + self.loop, self.log, self.utxo_cache_size if self.deep_synchronization else 0) h = self.last_block_height @@ -166,17 +167,43 @@ class Connector: async def utxo_init(self): if self.utxo_data: - if self.db is None: + if self.db_type is None: raise Exception("UTXO data required db connection") - import rocksdb - lb = self.db.get(b"last_block") - if lb is None: - lb = 0 - self.db.put(b"last_block", int_to_bytes(0)) - self.db.put(b"last_cached_block", int_to_bytes(0)) + if self.db_type not in ("rocksdb", "leveldb", "postgresql"): + raise Exception("Connector supported database types is: rocksdb, leveldb, postgresql") + if self.db_type in ("rocksdb", "leveldb"): + # rocksdb and leveldb + lb = self.db.get(b"last_block") + if lb is None: + lb = 0 + self.db.put(b"last_block", int_to_bytes(0)) + self.db.put(b"last_cached_block", int_to_bytes(0)) + else: + lb = bytes_to_int(lb) + lc = bytes_to_int(self.db.get(b"last_cached_block")) else: - lb = bytes_to_int(lb) - lc = bytes_to_int(self.db.get(b"last_cached_block")) + # postgresql + async with self.db.acquire() as conn: + await conn.execute("""CREATE TABLE IF NOT EXISTS + connector_utxo (outpoint BYTEA, + data BYTEA, + PRIMARY KEY(outpoint)); + """) + await conn.execute("""CREATE TABLE IF NOT EXISTS + connector_utxo_state (name VARCHAR, + value BIGINT, + PRIMARY KEY(name)); + """) + lb = await conn.fetchval("SELECT value FROM connector_utxo_state WHERE name='last_block';") + lc = await conn.fetchval("SELECT value FROM connector_utxo_state WHERE name='last_cached_block';") + if lb is None: + lb = 0 + lc = 0 + await conn.execute("INSERT INTO connector_utxo_state (name, value) " + "VALUES ('last_block', 0);") + await conn.execute("INSERT INTO connector_utxo_state (name, value) " + "VALUES ('last_cached_block', 0);") + self.last_block_height = lb self.last_block_utxo_cached_height = lc diff --git a/pybtc/connector/utxo.py b/pybtc/connector/utxo.py index 92a1358..eea5dcb 100644 --- a/pybtc/connector/utxo.py +++ b/pybtc/connector/utxo.py @@ -4,10 +4,15 @@ import asyncio from collections import OrderedDict from pybtc import MRU import traceback -import rocksdb + +try: import rocksdb +except: pass +try: import plyvel +except: pass + class UTXO(): - def __init__(self, db, loop, log, cache_size): + def __init__(self, db_type, db, loop, log, cache_size): self.cached = MRU() self.missed = set() self.deleted = set() @@ -20,6 +25,7 @@ class UTXO(): self.pending_saved = OrderedDict() self.maturity = 100 self.size_limit = cache_size + self.db_type = db_type self.db = db self.loop = loop self.clear_tail = False @@ -54,7 +60,7 @@ class UTXO(): if not self.checkpoints: return self.save_process = True try: - self.log.critical("create checkpoint " + str( self.checkpoints)) + self.log.debug("create connector utxo checkpoint on block: " + str( self.checkpoints)) i = self.cached.peek_last_item() self.checkpoints = sorted(self.checkpoints) checkpoint = self.checkpoints.pop(0) @@ -96,32 +102,61 @@ class UTXO(): def rocksdb_atomic_batch(self): batch = rocksdb.WriteBatch() [batch.delete(k) for k in self.pending_deleted] - for k in self.pending_utxo: - batch.put(k[0], k[1]) + [batch.put(k[0], k[1]) for k in self.pending_utxo] batch.put(b"last_block", int_to_bytes(self.checkpoint)) batch.put(b"last_cached_block", int_to_bytes(self.deleted_last_block)) self.db.write(batch) + def leveldb_atomic_batch(self): + with self.db.write_batch() as batch: + [batch.delete(k) for k in self.pending_deleted] + [batch.put(k[0], k[1]) for k in self.pending_utxo] + batch.put(b"last_block", int_to_bytes(self.checkpoint)) + batch.put(b"last_cached_block", int_to_bytes(self.deleted_last_block)) + + + async def postgresql_atomic_batch(self): + async with self.db.acquire() as conn: + async with conn.transaction(): + if self.pending_deleted: + await conn.execute("DELETE FROM connector_utxo WHERE " + "outpoint = ANY($1);", self.pending_deleted) + if self.pending_utxo: + await conn.copy_records_to_table('connector_utxo', + columns=["outpoint", "data"], records=self.pending_utxo) + await conn.execute("UPDATE connector_utxo_state SET value = $1 " + "WHERE name = 'last_block';", self.checkpoint) + await conn.execute("UPDATE connector_utxo_state SET value = $1 " + "WHERE name = 'last_cached_block';", self.deleted_last_block) + + async def save_checkpoint(self): - # save to db tail from cache - if not self.checkpoint: return - if self.write_to_db: return - try: - self.write_to_db = True - if not self.checkpoint: return - await self.loop.run_in_executor(None, self.rocksdb_atomic_batch) - self.saved_utxo += len(self.pending_utxo) - self.deleted_utxo += len(self.pending_deleted) - self.pending_deleted = set() - self.pending_utxo = set() - self.pending_saved = OrderedDict() - self.last_saved_block = self.checkpoint - self.checkpoint = None - except Exception as err: - self.log.critical("save_checkpoint error: %s" % str(err)) - finally: - self.save_process = False - self.write_to_db = False + # save to db tail from cache + if not self.checkpoint: return + if self.write_to_db: return + try: + self.write_to_db = True + + if not self.checkpoint: return + if self.db_type == "rocksdb": + await self.loop.run_in_executor(None, self.rocksdb_atomic_batch) + elif self.db_type == "leveldb": + await self.loop.run_in_executor(None, self.leveldb_atomic_batch) + else: + await self.postgresql_atomic_batch() + + self.saved_utxo += len(self.pending_utxo) + self.deleted_utxo += len(self.pending_deleted) + self.pending_deleted = set() + self.pending_utxo = set() + self.pending_saved = OrderedDict() + self.last_saved_block = self.checkpoint + self.checkpoint = None + except Exception as err: + self.log.critical("save_checkpoint error: %s" % str(err)) + finally: + self.save_process = False + self.write_to_db = False def get(self, key): self._requests += 1 @@ -146,6 +181,7 @@ class UTXO(): except: return None + async def load_utxo(self): while True: if not self.load_utxo_future.done(): @@ -155,22 +191,51 @@ class UTXO(): try: self.load_utxo_future = asyncio.Future() l = list(self.missed) - rows = self.db.multi_get(l) + if self.db_type == "postgresql": + async with self.db.acquire() as conn: + rows = await conn.fetch("SELECT outpoint, connector_utxo.data " + "FROM connector_utxo " + "WHERE outpoint = ANY($1);", l) + for row in rows: + d = row["data"] + pointer = c_int_to_int(d) + f = c_int_len(pointer) + amount = c_int_to_int(d[f:]) + f += c_int_len(amount) + address = d[f:] + self.loaded[row["outpoint"]] = (pointer, amount, address) + self.loaded_utxo += 1 + + + elif self.db_type == "rocksdb": + rows = self.db.multi_get(l) + for outpoint in rows: + d = rows[outpoint] + pointer = c_int_to_int(d) + f = c_int_len(pointer) + amount = c_int_to_int(d[f:]) + f += c_int_len(amount) + address = d[f:] + self.loaded[outpoint] = (pointer, amount, address) + self.loaded_utxo += 1 + else: + for outpoint in l: + d = self.db.get(outpoint) + if d is None: continue + pointer = c_int_to_int(d) + f = c_int_len(pointer) + amount = c_int_to_int(d[f:]) + f += c_int_len(amount) + address = d[f:] + self.loaded[outpoint] = (pointer, amount, address) + self.loaded_utxo += 1 + for i in l: - try: - self.missed.remove(i) - except: - pass - for outpoint in rows: - d = rows[outpoint] - pointer = c_int_to_int(d) - f = c_int_len(pointer) - amount = c_int_to_int(d[f:]) - f += c_int_len(amount) - address = d[f:] - self.loaded[outpoint] = (pointer, amount, address) - self.loaded_utxo += 1 + try: self.missed.remove(i) + except: pass + + finally: self.load_utxo_future.set_result(True)