From abf6830f4f5bcb6c47756b7cc9ca1bf0736836af Mon Sep 17 00:00:00 2001 From: 4tochka Date: Mon, 27 May 2019 21:32:19 +0400 Subject: [PATCH] connector --- pybtc/connector/connector.py | 41 +++++++++++++----------------------- pybtc/connector/utxo.py | 30 +++++++++++--------------- 2 files changed, 27 insertions(+), 44 deletions(-) diff --git a/pybtc/connector/connector.py b/pybtc/connector/connector.py index 659f909..df78673 100644 --- a/pybtc/connector/connector.py +++ b/pybtc/connector/connector.py @@ -4,7 +4,7 @@ from pybtc.connector.utxo import UTXO from pybtc.connector.utils import decode_block_tx from pybtc.connector.utils import Cache from pybtc.transaction import Transaction -from pybtc import int_to_bytes +from pybtc import int_to_bytes, bytes_to_int import traceback import aiojsonrpc @@ -28,7 +28,7 @@ class Connector: skip_opreturn=True, block_preload_cache_limit= 1000 * 1000000, block_hashes_cache_limit= 200 * 1000000, - postgres_pool=None): + db=None): self.loop = asyncio.get_event_loop() # settings @@ -48,7 +48,7 @@ class Connector: self.deep_sync_limit = deep_sync_limit self.backlog = backlog self.mempool_tx = mempool_tx - self.postgress_pool = postgres_pool + self.db = db self.utxo_cache_size = utxo_cache_size self.utxo_data = utxo_data self.chain_tail = list(chain_tail) if chain_tail else [] @@ -145,7 +145,7 @@ class Connector: break if self.utxo_data: - self.utxo = UTXO(self.postgress_pool, + self.utxo = UTXO(self.db, self.loop, self.log, self.utxo_cache_size if self.deep_synchronization else 0) @@ -166,29 +166,18 @@ class Connector: async def utxo_init(self): if self.utxo_data: - if self.postgress_pool is None: - raise Exception("UTXO data required postgresql db connection pool") + if self.db is None: + raise Exception("UTXO data required db connection") + import rocksdb + lb = self.db.get(b"last_block") + if lb is None: + lb = 0 + self.db.set(b"last_block", int_to_bytes(0)) + self.db.set(b"last_cached_block", int_to_bytes(0)) + else: + lb = bytes_to_int(lb) + lc = bytes_to_int(self.db.get(b"last_cached_block")) - async with self.postgress_pool.acquire() as conn: - await conn.execute("""CREATE TABLE IF NOT EXISTS - connector_utxo (outpoint BYTEA, - data BYTEA, - PRIMARY KEY(outpoint)); - """) - await conn.execute("""CREATE TABLE IF NOT EXISTS - connector_utxo_state (name VARCHAR, - value BIGINT, - PRIMARY KEY(name)); - """) - lb = await conn.fetchval("SELECT value FROM connector_utxo_state WHERE name='last_block';") - lc = await conn.fetchval("SELECT value FROM connector_utxo_state WHERE name='last_cached_block';") - if lb is None: - lb = 0 - lc = 0 - await conn.execute("INSERT INTO connector_utxo_state (name, value) " - "VALUES ('last_block', 0);") - await conn.execute("INSERT INTO connector_utxo_state (name, value) " - "VALUES ('last_cached_block', 0);") self.last_block_height = lb self.last_block_utxo_cached_height = lc if self.app_block_height_on_start: diff --git a/pybtc/connector/utxo.py b/pybtc/connector/utxo.py index 29590e6..df46ea4 100644 --- a/pybtc/connector/utxo.py +++ b/pybtc/connector/utxo.py @@ -1,11 +1,12 @@ from pybtc import int_to_c_int, c_int_to_int, c_int_len +from pybtc import int_to_bytes, bytes_to_int import asyncio from collections import OrderedDict from pybtc import MRU import traceback class UTXO(): - def __init__(self, db_pool, loop, log, cache_size): + def __init__(self, db, loop, log, cache_size): self.cached = MRU() self.missed = set() self.deleted = set() @@ -18,7 +19,7 @@ class UTXO(): self.pending_saved = OrderedDict() self.maturity = 100 self.size_limit = cache_size - self._db_pool = db_pool + self.db = db self.loop = loop self.clear_tail = False self.last_saved_block = 0 @@ -99,19 +100,13 @@ class UTXO(): self.write_to_db = True if not self.checkpoint: return + batch = rocksdb.WriteBatch() + [batch.delete(k) for k in self.pending_deleted] + [batch.put(k[0], k[1]) for k in self.pending_utxo] + batch.put(b"last_block", int_to_bytes(self.checkpoint)) + batch.put(b"last_cached_block", int_to_bytes(self.deleted_last_block)) + self.db.write(batch) - async with self._db_pool.acquire() as conn: - async with conn.transaction(): - if self.pending_deleted: - await conn.execute("DELETE FROM connector_utxo WHERE " - "outpoint = ANY($1);", self.pending_deleted) - if self.pending_utxo: - await conn.copy_records_to_table('connector_utxo', - columns=["outpoint", "data"], records=self.pending_utxo) - await conn.execute("UPDATE connector_utxo_state SET value = $1 " - "WHERE name = 'last_block';", self.checkpoint) - await conn.execute("UPDATE connector_utxo_state SET value = $1 " - "WHERE name = 'last_cached_block';", self.deleted_last_block) self.saved_utxo += len(self.pending_utxo) self.deleted_utxo += len(self.pending_deleted) self.pending_deleted = set() @@ -157,10 +152,9 @@ class UTXO(): try: self.load_utxo_future = asyncio.Future() l = set(self.missed) - async with self._db_pool.acquire() as conn: - rows = await conn.fetch("SELECT outpoint, connector_utxo.data " - "FROM connector_utxo " - "WHERE outpoint = ANY($1);", l) + rows = [] + [rows.append({"outpoint": k, "data": self.get(k)}) for k in l] + for i in l: try: self.missed.remove(i)