connector
This commit is contained in:
parent
b921cd89f2
commit
13a163fb88
@ -28,6 +28,7 @@ class Connector:
|
|||||||
skip_opreturn=True,
|
skip_opreturn=True,
|
||||||
block_preload_cache_limit= 1000 * 1000000,
|
block_preload_cache_limit= 1000 * 1000000,
|
||||||
block_hashes_cache_limit= 200 * 1000000,
|
block_hashes_cache_limit= 200 * 1000000,
|
||||||
|
db_type=None,
|
||||||
db=None):
|
db=None):
|
||||||
self.loop = asyncio.get_event_loop()
|
self.loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
@ -48,6 +49,7 @@ class Connector:
|
|||||||
self.deep_sync_limit = deep_sync_limit
|
self.deep_sync_limit = deep_sync_limit
|
||||||
self.backlog = backlog
|
self.backlog = backlog
|
||||||
self.mempool_tx = mempool_tx
|
self.mempool_tx = mempool_tx
|
||||||
|
self.db_type = db_type
|
||||||
self.db = db
|
self.db = db
|
||||||
self.utxo_cache_size = utxo_cache_size
|
self.utxo_cache_size = utxo_cache_size
|
||||||
self.utxo_data = utxo_data
|
self.utxo_data = utxo_data
|
||||||
@ -145,9 +147,8 @@ class Connector:
|
|||||||
break
|
break
|
||||||
|
|
||||||
if self.utxo_data:
|
if self.utxo_data:
|
||||||
self.utxo = UTXO(self.db,
|
self.utxo = UTXO(self.db_type, self.db,
|
||||||
self.loop,
|
self.loop, self.log,
|
||||||
self.log,
|
|
||||||
self.utxo_cache_size if self.deep_synchronization else 0)
|
self.utxo_cache_size if self.deep_synchronization else 0)
|
||||||
|
|
||||||
h = self.last_block_height
|
h = self.last_block_height
|
||||||
@ -166,17 +167,43 @@ class Connector:
|
|||||||
|
|
||||||
async def utxo_init(self):
|
async def utxo_init(self):
|
||||||
if self.utxo_data:
|
if self.utxo_data:
|
||||||
if self.db is None:
|
if self.db_type is None:
|
||||||
raise Exception("UTXO data required db connection")
|
raise Exception("UTXO data required db connection")
|
||||||
import rocksdb
|
if self.db_type not in ("rocksdb", "leveldb", "postgresql"):
|
||||||
lb = self.db.get(b"last_block")
|
raise Exception("Connector supported database types is: rocksdb, leveldb, postgresql")
|
||||||
if lb is None:
|
if self.db_type in ("rocksdb", "leveldb"):
|
||||||
lb = 0
|
# rocksdb and leveldb
|
||||||
self.db.put(b"last_block", int_to_bytes(0))
|
lb = self.db.get(b"last_block")
|
||||||
self.db.put(b"last_cached_block", int_to_bytes(0))
|
if lb is None:
|
||||||
|
lb = 0
|
||||||
|
self.db.put(b"last_block", int_to_bytes(0))
|
||||||
|
self.db.put(b"last_cached_block", int_to_bytes(0))
|
||||||
|
else:
|
||||||
|
lb = bytes_to_int(lb)
|
||||||
|
lc = bytes_to_int(self.db.get(b"last_cached_block"))
|
||||||
else:
|
else:
|
||||||
lb = bytes_to_int(lb)
|
# postgresql
|
||||||
lc = bytes_to_int(self.db.get(b"last_cached_block"))
|
async with self.db.acquire() as conn:
|
||||||
|
await conn.execute("""CREATE TABLE IF NOT EXISTS
|
||||||
|
connector_utxo (outpoint BYTEA,
|
||||||
|
data BYTEA,
|
||||||
|
PRIMARY KEY(outpoint));
|
||||||
|
""")
|
||||||
|
await conn.execute("""CREATE TABLE IF NOT EXISTS
|
||||||
|
connector_utxo_state (name VARCHAR,
|
||||||
|
value BIGINT,
|
||||||
|
PRIMARY KEY(name));
|
||||||
|
""")
|
||||||
|
lb = await conn.fetchval("SELECT value FROM connector_utxo_state WHERE name='last_block';")
|
||||||
|
lc = await conn.fetchval("SELECT value FROM connector_utxo_state WHERE name='last_cached_block';")
|
||||||
|
if lb is None:
|
||||||
|
lb = 0
|
||||||
|
lc = 0
|
||||||
|
await conn.execute("INSERT INTO connector_utxo_state (name, value) "
|
||||||
|
"VALUES ('last_block', 0);")
|
||||||
|
await conn.execute("INSERT INTO connector_utxo_state (name, value) "
|
||||||
|
"VALUES ('last_cached_block', 0);")
|
||||||
|
|
||||||
|
|
||||||
self.last_block_height = lb
|
self.last_block_height = lb
|
||||||
self.last_block_utxo_cached_height = lc
|
self.last_block_utxo_cached_height = lc
|
||||||
|
|||||||
@ -4,10 +4,15 @@ import asyncio
|
|||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from pybtc import MRU
|
from pybtc import MRU
|
||||||
import traceback
|
import traceback
|
||||||
import rocksdb
|
|
||||||
|
try: import rocksdb
|
||||||
|
except: pass
|
||||||
|
try: import plyvel
|
||||||
|
except: pass
|
||||||
|
|
||||||
|
|
||||||
class UTXO():
|
class UTXO():
|
||||||
def __init__(self, db, loop, log, cache_size):
|
def __init__(self, db_type, db, loop, log, cache_size):
|
||||||
self.cached = MRU()
|
self.cached = MRU()
|
||||||
self.missed = set()
|
self.missed = set()
|
||||||
self.deleted = set()
|
self.deleted = set()
|
||||||
@ -20,6 +25,7 @@ class UTXO():
|
|||||||
self.pending_saved = OrderedDict()
|
self.pending_saved = OrderedDict()
|
||||||
self.maturity = 100
|
self.maturity = 100
|
||||||
self.size_limit = cache_size
|
self.size_limit = cache_size
|
||||||
|
self.db_type = db_type
|
||||||
self.db = db
|
self.db = db
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.clear_tail = False
|
self.clear_tail = False
|
||||||
@ -54,7 +60,7 @@ class UTXO():
|
|||||||
if not self.checkpoints: return
|
if not self.checkpoints: return
|
||||||
self.save_process = True
|
self.save_process = True
|
||||||
try:
|
try:
|
||||||
self.log.critical("create checkpoint " + str( self.checkpoints))
|
self.log.debug("create connector utxo checkpoint on block: " + str( self.checkpoints))
|
||||||
i = self.cached.peek_last_item()
|
i = self.cached.peek_last_item()
|
||||||
self.checkpoints = sorted(self.checkpoints)
|
self.checkpoints = sorted(self.checkpoints)
|
||||||
checkpoint = self.checkpoints.pop(0)
|
checkpoint = self.checkpoints.pop(0)
|
||||||
@ -96,32 +102,61 @@ class UTXO():
|
|||||||
def rocksdb_atomic_batch(self):
|
def rocksdb_atomic_batch(self):
|
||||||
batch = rocksdb.WriteBatch()
|
batch = rocksdb.WriteBatch()
|
||||||
[batch.delete(k) for k in self.pending_deleted]
|
[batch.delete(k) for k in self.pending_deleted]
|
||||||
for k in self.pending_utxo:
|
[batch.put(k[0], k[1]) for k in self.pending_utxo]
|
||||||
batch.put(k[0], k[1])
|
|
||||||
batch.put(b"last_block", int_to_bytes(self.checkpoint))
|
batch.put(b"last_block", int_to_bytes(self.checkpoint))
|
||||||
batch.put(b"last_cached_block", int_to_bytes(self.deleted_last_block))
|
batch.put(b"last_cached_block", int_to_bytes(self.deleted_last_block))
|
||||||
self.db.write(batch)
|
self.db.write(batch)
|
||||||
|
|
||||||
|
def leveldb_atomic_batch(self):
|
||||||
|
with self.db.write_batch() as batch:
|
||||||
|
[batch.delete(k) for k in self.pending_deleted]
|
||||||
|
[batch.put(k[0], k[1]) for k in self.pending_utxo]
|
||||||
|
batch.put(b"last_block", int_to_bytes(self.checkpoint))
|
||||||
|
batch.put(b"last_cached_block", int_to_bytes(self.deleted_last_block))
|
||||||
|
|
||||||
|
|
||||||
|
async def postgresql_atomic_batch(self):
|
||||||
|
async with self.db.acquire() as conn:
|
||||||
|
async with conn.transaction():
|
||||||
|
if self.pending_deleted:
|
||||||
|
await conn.execute("DELETE FROM connector_utxo WHERE "
|
||||||
|
"outpoint = ANY($1);", self.pending_deleted)
|
||||||
|
if self.pending_utxo:
|
||||||
|
await conn.copy_records_to_table('connector_utxo',
|
||||||
|
columns=["outpoint", "data"], records=self.pending_utxo)
|
||||||
|
await conn.execute("UPDATE connector_utxo_state SET value = $1 "
|
||||||
|
"WHERE name = 'last_block';", self.checkpoint)
|
||||||
|
await conn.execute("UPDATE connector_utxo_state SET value = $1 "
|
||||||
|
"WHERE name = 'last_cached_block';", self.deleted_last_block)
|
||||||
|
|
||||||
|
|
||||||
async def save_checkpoint(self):
|
async def save_checkpoint(self):
|
||||||
# save to db tail from cache
|
# save to db tail from cache
|
||||||
if not self.checkpoint: return
|
if not self.checkpoint: return
|
||||||
if self.write_to_db: return
|
if self.write_to_db: return
|
||||||
try:
|
try:
|
||||||
self.write_to_db = True
|
self.write_to_db = True
|
||||||
if not self.checkpoint: return
|
|
||||||
await self.loop.run_in_executor(None, self.rocksdb_atomic_batch)
|
if not self.checkpoint: return
|
||||||
self.saved_utxo += len(self.pending_utxo)
|
if self.db_type == "rocksdb":
|
||||||
self.deleted_utxo += len(self.pending_deleted)
|
await self.loop.run_in_executor(None, self.rocksdb_atomic_batch)
|
||||||
self.pending_deleted = set()
|
elif self.db_type == "leveldb":
|
||||||
self.pending_utxo = set()
|
await self.loop.run_in_executor(None, self.leveldb_atomic_batch)
|
||||||
self.pending_saved = OrderedDict()
|
else:
|
||||||
self.last_saved_block = self.checkpoint
|
await self.postgresql_atomic_batch()
|
||||||
self.checkpoint = None
|
|
||||||
except Exception as err:
|
self.saved_utxo += len(self.pending_utxo)
|
||||||
self.log.critical("save_checkpoint error: %s" % str(err))
|
self.deleted_utxo += len(self.pending_deleted)
|
||||||
finally:
|
self.pending_deleted = set()
|
||||||
self.save_process = False
|
self.pending_utxo = set()
|
||||||
self.write_to_db = False
|
self.pending_saved = OrderedDict()
|
||||||
|
self.last_saved_block = self.checkpoint
|
||||||
|
self.checkpoint = None
|
||||||
|
except Exception as err:
|
||||||
|
self.log.critical("save_checkpoint error: %s" % str(err))
|
||||||
|
finally:
|
||||||
|
self.save_process = False
|
||||||
|
self.write_to_db = False
|
||||||
|
|
||||||
def get(self, key):
|
def get(self, key):
|
||||||
self._requests += 1
|
self._requests += 1
|
||||||
@ -146,6 +181,7 @@ class UTXO():
|
|||||||
except:
|
except:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
async def load_utxo(self):
|
async def load_utxo(self):
|
||||||
while True:
|
while True:
|
||||||
if not self.load_utxo_future.done():
|
if not self.load_utxo_future.done():
|
||||||
@ -155,22 +191,51 @@ class UTXO():
|
|||||||
try:
|
try:
|
||||||
self.load_utxo_future = asyncio.Future()
|
self.load_utxo_future = asyncio.Future()
|
||||||
l = list(self.missed)
|
l = list(self.missed)
|
||||||
rows = self.db.multi_get(l)
|
if self.db_type == "postgresql":
|
||||||
|
async with self.db.acquire() as conn:
|
||||||
|
rows = await conn.fetch("SELECT outpoint, connector_utxo.data "
|
||||||
|
"FROM connector_utxo "
|
||||||
|
"WHERE outpoint = ANY($1);", l)
|
||||||
|
for row in rows:
|
||||||
|
d = row["data"]
|
||||||
|
pointer = c_int_to_int(d)
|
||||||
|
f = c_int_len(pointer)
|
||||||
|
amount = c_int_to_int(d[f:])
|
||||||
|
f += c_int_len(amount)
|
||||||
|
address = d[f:]
|
||||||
|
self.loaded[row["outpoint"]] = (pointer, amount, address)
|
||||||
|
self.loaded_utxo += 1
|
||||||
|
|
||||||
|
|
||||||
|
elif self.db_type == "rocksdb":
|
||||||
|
rows = self.db.multi_get(l)
|
||||||
|
for outpoint in rows:
|
||||||
|
d = rows[outpoint]
|
||||||
|
pointer = c_int_to_int(d)
|
||||||
|
f = c_int_len(pointer)
|
||||||
|
amount = c_int_to_int(d[f:])
|
||||||
|
f += c_int_len(amount)
|
||||||
|
address = d[f:]
|
||||||
|
self.loaded[outpoint] = (pointer, amount, address)
|
||||||
|
self.loaded_utxo += 1
|
||||||
|
else:
|
||||||
|
for outpoint in l:
|
||||||
|
d = self.db.get(outpoint)
|
||||||
|
if d is None: continue
|
||||||
|
pointer = c_int_to_int(d)
|
||||||
|
f = c_int_len(pointer)
|
||||||
|
amount = c_int_to_int(d[f:])
|
||||||
|
f += c_int_len(amount)
|
||||||
|
address = d[f:]
|
||||||
|
self.loaded[outpoint] = (pointer, amount, address)
|
||||||
|
self.loaded_utxo += 1
|
||||||
|
|
||||||
|
|
||||||
for i in l:
|
for i in l:
|
||||||
try:
|
try: self.missed.remove(i)
|
||||||
self.missed.remove(i)
|
except: pass
|
||||||
except:
|
|
||||||
pass
|
|
||||||
for outpoint in rows:
|
|
||||||
d = rows[outpoint]
|
|
||||||
pointer = c_int_to_int(d)
|
|
||||||
f = c_int_len(pointer)
|
|
||||||
amount = c_int_to_int(d[f:])
|
|
||||||
f += c_int_len(amount)
|
|
||||||
address = d[f:]
|
|
||||||
self.loaded[outpoint] = (pointer, amount, address)
|
|
||||||
self.loaded_utxo += 1
|
|
||||||
finally:
|
finally:
|
||||||
self.load_utxo_future.set_result(True)
|
self.load_utxo_future.set_result(True)
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user