connector
This commit is contained in:
parent
50d6cadef0
commit
abf6830f4f
@ -4,7 +4,7 @@ from pybtc.connector.utxo import UTXO
|
||||
from pybtc.connector.utils import decode_block_tx
|
||||
from pybtc.connector.utils import Cache
|
||||
from pybtc.transaction import Transaction
|
||||
from pybtc import int_to_bytes
|
||||
from pybtc import int_to_bytes, bytes_to_int
|
||||
|
||||
import traceback
|
||||
import aiojsonrpc
|
||||
@ -28,7 +28,7 @@ class Connector:
|
||||
skip_opreturn=True,
|
||||
block_preload_cache_limit= 1000 * 1000000,
|
||||
block_hashes_cache_limit= 200 * 1000000,
|
||||
postgres_pool=None):
|
||||
db=None):
|
||||
self.loop = asyncio.get_event_loop()
|
||||
|
||||
# settings
|
||||
@ -48,7 +48,7 @@ class Connector:
|
||||
self.deep_sync_limit = deep_sync_limit
|
||||
self.backlog = backlog
|
||||
self.mempool_tx = mempool_tx
|
||||
self.postgress_pool = postgres_pool
|
||||
self.db = db
|
||||
self.utxo_cache_size = utxo_cache_size
|
||||
self.utxo_data = utxo_data
|
||||
self.chain_tail = list(chain_tail) if chain_tail else []
|
||||
@ -145,7 +145,7 @@ class Connector:
|
||||
break
|
||||
|
||||
if self.utxo_data:
|
||||
self.utxo = UTXO(self.postgress_pool,
|
||||
self.utxo = UTXO(self.db,
|
||||
self.loop,
|
||||
self.log,
|
||||
self.utxo_cache_size if self.deep_synchronization else 0)
|
||||
@ -166,29 +166,18 @@ class Connector:
|
||||
|
||||
async def utxo_init(self):
|
||||
if self.utxo_data:
|
||||
if self.postgress_pool is None:
|
||||
raise Exception("UTXO data required postgresql db connection pool")
|
||||
if self.db is None:
|
||||
raise Exception("UTXO data required db connection")
|
||||
import rocksdb
|
||||
lb = self.db.get(b"last_block")
|
||||
if lb is None:
|
||||
lb = 0
|
||||
self.db.set(b"last_block", int_to_bytes(0))
|
||||
self.db.set(b"last_cached_block", int_to_bytes(0))
|
||||
else:
|
||||
lb = bytes_to_int(lb)
|
||||
lc = bytes_to_int(self.db.get(b"last_cached_block"))
|
||||
|
||||
async with self.postgress_pool.acquire() as conn:
|
||||
await conn.execute("""CREATE TABLE IF NOT EXISTS
|
||||
connector_utxo (outpoint BYTEA,
|
||||
data BYTEA,
|
||||
PRIMARY KEY(outpoint));
|
||||
""")
|
||||
await conn.execute("""CREATE TABLE IF NOT EXISTS
|
||||
connector_utxo_state (name VARCHAR,
|
||||
value BIGINT,
|
||||
PRIMARY KEY(name));
|
||||
""")
|
||||
lb = await conn.fetchval("SELECT value FROM connector_utxo_state WHERE name='last_block';")
|
||||
lc = await conn.fetchval("SELECT value FROM connector_utxo_state WHERE name='last_cached_block';")
|
||||
if lb is None:
|
||||
lb = 0
|
||||
lc = 0
|
||||
await conn.execute("INSERT INTO connector_utxo_state (name, value) "
|
||||
"VALUES ('last_block', 0);")
|
||||
await conn.execute("INSERT INTO connector_utxo_state (name, value) "
|
||||
"VALUES ('last_cached_block', 0);")
|
||||
self.last_block_height = lb
|
||||
self.last_block_utxo_cached_height = lc
|
||||
if self.app_block_height_on_start:
|
||||
|
||||
@ -1,11 +1,12 @@
|
||||
from pybtc import int_to_c_int, c_int_to_int, c_int_len
|
||||
from pybtc import int_to_bytes, bytes_to_int
|
||||
import asyncio
|
||||
from collections import OrderedDict
|
||||
from pybtc import MRU
|
||||
import traceback
|
||||
|
||||
class UTXO():
|
||||
def __init__(self, db_pool, loop, log, cache_size):
|
||||
def __init__(self, db, loop, log, cache_size):
|
||||
self.cached = MRU()
|
||||
self.missed = set()
|
||||
self.deleted = set()
|
||||
@ -18,7 +19,7 @@ class UTXO():
|
||||
self.pending_saved = OrderedDict()
|
||||
self.maturity = 100
|
||||
self.size_limit = cache_size
|
||||
self._db_pool = db_pool
|
||||
self.db = db
|
||||
self.loop = loop
|
||||
self.clear_tail = False
|
||||
self.last_saved_block = 0
|
||||
@ -99,19 +100,13 @@ class UTXO():
|
||||
self.write_to_db = True
|
||||
if not self.checkpoint: return
|
||||
|
||||
batch = rocksdb.WriteBatch()
|
||||
[batch.delete(k) for k in self.pending_deleted]
|
||||
[batch.put(k[0], k[1]) for k in self.pending_utxo]
|
||||
batch.put(b"last_block", int_to_bytes(self.checkpoint))
|
||||
batch.put(b"last_cached_block", int_to_bytes(self.deleted_last_block))
|
||||
self.db.write(batch)
|
||||
|
||||
async with self._db_pool.acquire() as conn:
|
||||
async with conn.transaction():
|
||||
if self.pending_deleted:
|
||||
await conn.execute("DELETE FROM connector_utxo WHERE "
|
||||
"outpoint = ANY($1);", self.pending_deleted)
|
||||
if self.pending_utxo:
|
||||
await conn.copy_records_to_table('connector_utxo',
|
||||
columns=["outpoint", "data"], records=self.pending_utxo)
|
||||
await conn.execute("UPDATE connector_utxo_state SET value = $1 "
|
||||
"WHERE name = 'last_block';", self.checkpoint)
|
||||
await conn.execute("UPDATE connector_utxo_state SET value = $1 "
|
||||
"WHERE name = 'last_cached_block';", self.deleted_last_block)
|
||||
self.saved_utxo += len(self.pending_utxo)
|
||||
self.deleted_utxo += len(self.pending_deleted)
|
||||
self.pending_deleted = set()
|
||||
@ -157,10 +152,9 @@ class UTXO():
|
||||
try:
|
||||
self.load_utxo_future = asyncio.Future()
|
||||
l = set(self.missed)
|
||||
async with self._db_pool.acquire() as conn:
|
||||
rows = await conn.fetch("SELECT outpoint, connector_utxo.data "
|
||||
"FROM connector_utxo "
|
||||
"WHERE outpoint = ANY($1);", l)
|
||||
rows = []
|
||||
[rows.append({"outpoint": k, "data": self.get(k)}) for k in l]
|
||||
|
||||
for i in l:
|
||||
try:
|
||||
self.missed.remove(i)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user