connector
This commit is contained in:
parent
429d14e062
commit
afcb02f556
@ -61,7 +61,7 @@ class BlockLoader:
|
||||
|
||||
|
||||
async def loading(self):
|
||||
self.last_batch_size = 10
|
||||
self.rpc_batch_limit = 10
|
||||
self.worker_tasks = [self.loop.create_task(self.start_worker(i)) for i in range(self.worker_limit)]
|
||||
target_height = self.parent.node_last_block - self.parent.deep_sync_limit
|
||||
height = self.parent.last_block_height + 1
|
||||
@ -70,12 +70,15 @@ class BlockLoader:
|
||||
new_requests = 0
|
||||
if self.parent.block_preload._store_size < self.parent.block_preload_cache_limit:
|
||||
try:
|
||||
if self.last_batch_size < 1000000:
|
||||
if self.last_batch_size < 1000000 and self.rpc_batch_limit < 450:
|
||||
self.rpc_batch_limit += 50
|
||||
self.log.warning("rpc batch limit %s " % self.rpc_batch_limit)
|
||||
elif self.last_batch_size > 40000000 and self.rpc_batch_limit > 10:
|
||||
self.rpc_batch_limit = 30
|
||||
self.log.warning("rpc batch limit %s " % self.rpc_batch_limit)
|
||||
if self.last_batch_size > 80000000 and self.rpc_batch_limit > 10:
|
||||
self.rpc_batch_limit = 20
|
||||
self.log.warning("rpc batch limit %s " % self.rpc_batch_limit)
|
||||
for i in self.worker_busy:
|
||||
if not self.worker_busy[i]:
|
||||
self.worker_busy[i] = True
|
||||
|
||||
@ -15,6 +15,7 @@ import time
|
||||
from _pickle import loads
|
||||
|
||||
class Connector:
|
||||
|
||||
def __init__(self, node_rpc_url, node_zerromq_url, logger,
|
||||
last_block_height=0, chain_tail=None,
|
||||
tx_handler=None, orphan_handler=None,
|
||||
@ -84,9 +85,9 @@ class Connector:
|
||||
self.total_received_tx_time = 0
|
||||
self.coins = 0
|
||||
self.destroyed_coins = 0
|
||||
self.tt = 0
|
||||
self.yy = 0
|
||||
self.aa = 0
|
||||
self.preload_cached_total = 0
|
||||
self.preload_cached = 0
|
||||
self.preload_cached_annihilated = 0
|
||||
self.start_time = time.time()
|
||||
|
||||
# cache and system
|
||||
@ -188,7 +189,9 @@ class Connector:
|
||||
async with self.db.acquire() as conn:
|
||||
await conn.execute("""CREATE TABLE IF NOT EXISTS
|
||||
connector_utxo (outpoint BYTEA,
|
||||
data BYTEA,
|
||||
pointer BIGINT,
|
||||
address BYTEA,
|
||||
amount BIGINT,
|
||||
PRIMARY KEY(outpoint));
|
||||
""")
|
||||
await conn.execute("""CREATE TABLE IF NOT EXISTS
|
||||
@ -448,17 +451,30 @@ class Connector:
|
||||
next(reversed(self.block_preload._store))))
|
||||
|
||||
self.log.debug("utxo checkpoint block %s; "
|
||||
"saved utxo %s; "
|
||||
"deleted utxo %s; "
|
||||
"loaded utxo %s; "% (self.utxo.last_saved_block,
|
||||
"saved utxo %s; "
|
||||
"deleted utxo %s; "
|
||||
"loaded utxo %s; "% (self.utxo.last_saved_block,
|
||||
self.utxo.saved_utxo,
|
||||
self.utxo.deleted_utxo,
|
||||
self.utxo.loaded_utxo
|
||||
))
|
||||
self.log.debug("Preload coins cached/destoyed -> %s-%s [%s];" % (self.yy, self.aa, self.tt))
|
||||
self.log.debug("Preload coins cache -> %s:%s [%s] "
|
||||
"preload cache effectivity %s ;" % (self.preload_cached,
|
||||
self.preload_cached_annihilated,
|
||||
self.preload_cached_total,
|
||||
round(self.preload_cached_total
|
||||
/ self.destroyed_coins, 4)))
|
||||
self.log.debug("Coins %s; destroyed %s; unspent %s;" % (self.coins,
|
||||
self.destroyed_coins,
|
||||
self.coins - self.destroyed_coins))
|
||||
self.destroyed_coins,
|
||||
self.coins - self.destroyed_coins))
|
||||
self.log.debug("Coins destroyed in cache %s; "
|
||||
"cache effectivity [+preload cache] %s [%s];" % (self.utxo.deleted_utxo,
|
||||
round(self.preload_cached_total
|
||||
/ self.destroyed_coins, 4),
|
||||
round((self.preload_cached_total
|
||||
+ self.preload_cached_total)
|
||||
/ self.destroyed_coins, 4)))
|
||||
|
||||
self.log.debug("total tx fetch time %s;" % self.total_received_tx_time)
|
||||
self.log.debug("total blocks processing time %s;" % self.blocks_processing_time)
|
||||
t = int(time.time() - self.start_time)
|
||||
@ -545,7 +561,7 @@ class Connector:
|
||||
for i in tx["vOut"]:
|
||||
self.coins += 1
|
||||
if "_s_" in tx["vOut"][i]:
|
||||
self.tt += 1
|
||||
self.preload_cached_total += 1
|
||||
else:
|
||||
out = tx["vOut"][i]
|
||||
if self.skip_opreturn and out["nType"] in (3, 8):
|
||||
@ -573,12 +589,12 @@ class Connector:
|
||||
try:
|
||||
tx["vIn"][i]["coin"] = inp["_a_"]
|
||||
c += 1
|
||||
self.aa += 1
|
||||
self.preload_cached_annihilated += 1
|
||||
except:
|
||||
try:
|
||||
tx["vIn"][i]["coin"] = inp["_c_"]
|
||||
c += 1
|
||||
self.yy += 1
|
||||
self.preload_cached += 1
|
||||
try:
|
||||
self.utxo.get(outpoint)
|
||||
except:
|
||||
@ -714,12 +730,12 @@ class Connector:
|
||||
try:
|
||||
tx["vIn"][i]["coin"] = inp["_a_"]
|
||||
c += 1
|
||||
self.aa += 1
|
||||
self.preload_cached_anigilated += 1
|
||||
except:
|
||||
try:
|
||||
tx["vIn"][i]["coin"] = inp["_c_"]
|
||||
c += 1
|
||||
self.yy += 1
|
||||
self.preload_cached += 1
|
||||
try:
|
||||
self.utxo.get(outpoint)
|
||||
except:
|
||||
@ -750,7 +766,7 @@ class Connector:
|
||||
for i in tx["vOut"]:
|
||||
self.coins += 1
|
||||
if "_s_" in tx["vOut"][i]:
|
||||
self.tt += 1
|
||||
self.preload_cached_total += 1
|
||||
else:
|
||||
out = tx["vOut"][i]
|
||||
if self.skip_opreturn and out["nType"] in (3, 8):
|
||||
|
||||
@ -41,7 +41,7 @@ class UTXO():
|
||||
self.saved_utxo = 0
|
||||
self.deleted_utxo = 0
|
||||
self.deleted_last_block = 0
|
||||
self.deleted_utxo_saved = 0
|
||||
self.deleted_utxo = 0
|
||||
self.loaded_utxo = 0
|
||||
self.destroyed_utxo = 0
|
||||
self.destroyed_utxo_block = 0
|
||||
@ -85,9 +85,8 @@ class UTXO():
|
||||
if len(self.cached) <= self.size_limit:
|
||||
if block_changed and checkpoint_found:
|
||||
break
|
||||
self.pending_utxo.add((i[0],b"".join((int_to_c_int(i[1][0]),
|
||||
int_to_c_int(i[1][1]),
|
||||
i[1][2]))))
|
||||
self.pending_utxo.add((i[0], i[1][0], i[1][2], i[1][1]))
|
||||
|
||||
self.pending_saved[i[0]] = i[1]
|
||||
if block_changed:
|
||||
self.cached.append({i[0]: i[1]})
|
||||
@ -121,9 +120,14 @@ class UTXO():
|
||||
if self.pending_deleted:
|
||||
await conn.execute("DELETE FROM connector_utxo WHERE "
|
||||
"outpoint = ANY($1);", self.pending_deleted)
|
||||
self.deleted_utxo += len(self.pending_deleted)
|
||||
if self.pending_utxo:
|
||||
await conn.copy_records_to_table('connector_utxo',
|
||||
columns=["outpoint", "data"], records=self.pending_utxo)
|
||||
columns=["outpoint",
|
||||
"pointer",
|
||||
"address",
|
||||
"amount"],
|
||||
records=self.pending_utxo)
|
||||
await conn.execute("UPDATE connector_utxo_state SET value = $1 "
|
||||
"WHERE name = 'last_block';", self.checkpoint)
|
||||
await conn.execute("UPDATE connector_utxo_state SET value = $1 "
|
||||
@ -193,17 +197,16 @@ class UTXO():
|
||||
l = list(self.missed)
|
||||
if self.db_type == "postgresql":
|
||||
async with self.db.acquire() as conn:
|
||||
rows = await conn.fetch("SELECT outpoint, connector_utxo.data "
|
||||
rows = await conn.fetch("SELECT outpoint, "
|
||||
" pointer,"
|
||||
" address,"
|
||||
" amount "
|
||||
"FROM connector_utxo "
|
||||
"WHERE outpoint = ANY($1);", l)
|
||||
for row in rows:
|
||||
d = row["data"]
|
||||
pointer = c_int_to_int(d)
|
||||
f = c_int_len(pointer)
|
||||
amount = c_int_to_int(d[f:])
|
||||
f += c_int_len(amount)
|
||||
address = d[f:]
|
||||
self.loaded[row["outpoint"]] = (pointer, amount, address)
|
||||
self.loaded[row["outpoint"]] = (row["pointer"],
|
||||
row["amount"],
|
||||
row["address"])
|
||||
self.loaded_utxo += 1
|
||||
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user