From afcb02f5563d19573bcfa85e3d59f4daf5a80b63 Mon Sep 17 00:00:00 2001 From: 4tochka Date: Sat, 1 Jun 2019 19:11:49 +0300 Subject: [PATCH] connector --- pybtc/connector/block_loader.py | 7 +++-- pybtc/connector/connector.py | 48 ++++++++++++++++++++++----------- pybtc/connector/utxo.py | 29 +++++++++++--------- 3 files changed, 53 insertions(+), 31 deletions(-) diff --git a/pybtc/connector/block_loader.py b/pybtc/connector/block_loader.py index 3cf674e..10971a4 100644 --- a/pybtc/connector/block_loader.py +++ b/pybtc/connector/block_loader.py @@ -61,7 +61,7 @@ class BlockLoader: async def loading(self): - self.last_batch_size = 10 + self.rpc_batch_limit = 10 self.worker_tasks = [self.loop.create_task(self.start_worker(i)) for i in range(self.worker_limit)] target_height = self.parent.node_last_block - self.parent.deep_sync_limit height = self.parent.last_block_height + 1 @@ -70,12 +70,15 @@ class BlockLoader: new_requests = 0 if self.parent.block_preload._store_size < self.parent.block_preload_cache_limit: try: - if self.last_batch_size < 1000000: + if self.last_batch_size < 1000000 and self.rpc_batch_limit < 450: self.rpc_batch_limit += 50 + self.log.warning("rpc batch limit %s " % self.rpc_batch_limit) elif self.last_batch_size > 40000000 and self.rpc_batch_limit > 10: self.rpc_batch_limit = 30 + self.log.warning("rpc batch limit %s " % self.rpc_batch_limit) if self.last_batch_size > 80000000 and self.rpc_batch_limit > 10: self.rpc_batch_limit = 20 + self.log.warning("rpc batch limit %s " % self.rpc_batch_limit) for i in self.worker_busy: if not self.worker_busy[i]: self.worker_busy[i] = True diff --git a/pybtc/connector/connector.py b/pybtc/connector/connector.py index 889b4c3..c5e30e7 100644 --- a/pybtc/connector/connector.py +++ b/pybtc/connector/connector.py @@ -15,6 +15,7 @@ import time from _pickle import loads class Connector: + def __init__(self, node_rpc_url, node_zerromq_url, logger, last_block_height=0, chain_tail=None, tx_handler=None, orphan_handler=None, @@ -84,9 +85,9 @@ class Connector: self.total_received_tx_time = 0 self.coins = 0 self.destroyed_coins = 0 - self.tt = 0 - self.yy = 0 - self.aa = 0 + self.preload_cached_total = 0 + self.preload_cached = 0 + self.preload_cached_annihilated = 0 self.start_time = time.time() # cache and system @@ -188,7 +189,9 @@ class Connector: async with self.db.acquire() as conn: await conn.execute("""CREATE TABLE IF NOT EXISTS connector_utxo (outpoint BYTEA, - data BYTEA, + pointer BIGINT, + address BYTEA, + amount BIGINT, PRIMARY KEY(outpoint)); """) await conn.execute("""CREATE TABLE IF NOT EXISTS @@ -448,17 +451,30 @@ class Connector: next(reversed(self.block_preload._store)))) self.log.debug("utxo checkpoint block %s; " - "saved utxo %s; " - "deleted utxo %s; " - "loaded utxo %s; "% (self.utxo.last_saved_block, + "saved utxo %s; " + "deleted utxo %s; " + "loaded utxo %s; "% (self.utxo.last_saved_block, self.utxo.saved_utxo, self.utxo.deleted_utxo, self.utxo.loaded_utxo )) - self.log.debug("Preload coins cached/destoyed -> %s-%s [%s];" % (self.yy, self.aa, self.tt)) + self.log.debug("Preload coins cache -> %s:%s [%s] " + "preload cache effectivity %s ;" % (self.preload_cached, + self.preload_cached_annihilated, + self.preload_cached_total, + round(self.preload_cached_total + / self.destroyed_coins, 4))) self.log.debug("Coins %s; destroyed %s; unspent %s;" % (self.coins, - self.destroyed_coins, - self.coins - self.destroyed_coins)) + self.destroyed_coins, + self.coins - self.destroyed_coins)) + self.log.debug("Coins destroyed in cache %s; " + "cache effectivity [+preload cache] %s [%s];" % (self.utxo.deleted_utxo, + round(self.preload_cached_total + / self.destroyed_coins, 4), + round((self.preload_cached_total + + self.preload_cached_total) + / self.destroyed_coins, 4))) + self.log.debug("total tx fetch time %s;" % self.total_received_tx_time) self.log.debug("total blocks processing time %s;" % self.blocks_processing_time) t = int(time.time() - self.start_time) @@ -545,7 +561,7 @@ class Connector: for i in tx["vOut"]: self.coins += 1 if "_s_" in tx["vOut"][i]: - self.tt += 1 + self.preload_cached_total += 1 else: out = tx["vOut"][i] if self.skip_opreturn and out["nType"] in (3, 8): @@ -573,12 +589,12 @@ class Connector: try: tx["vIn"][i]["coin"] = inp["_a_"] c += 1 - self.aa += 1 + self.preload_cached_annihilated += 1 except: try: tx["vIn"][i]["coin"] = inp["_c_"] c += 1 - self.yy += 1 + self.preload_cached += 1 try: self.utxo.get(outpoint) except: @@ -714,12 +730,12 @@ class Connector: try: tx["vIn"][i]["coin"] = inp["_a_"] c += 1 - self.aa += 1 + self.preload_cached_anigilated += 1 except: try: tx["vIn"][i]["coin"] = inp["_c_"] c += 1 - self.yy += 1 + self.preload_cached += 1 try: self.utxo.get(outpoint) except: @@ -750,7 +766,7 @@ class Connector: for i in tx["vOut"]: self.coins += 1 if "_s_" in tx["vOut"][i]: - self.tt += 1 + self.preload_cached_total += 1 else: out = tx["vOut"][i] if self.skip_opreturn and out["nType"] in (3, 8): diff --git a/pybtc/connector/utxo.py b/pybtc/connector/utxo.py index 0637115..4e8205e 100644 --- a/pybtc/connector/utxo.py +++ b/pybtc/connector/utxo.py @@ -41,7 +41,7 @@ class UTXO(): self.saved_utxo = 0 self.deleted_utxo = 0 self.deleted_last_block = 0 - self.deleted_utxo_saved = 0 + self.deleted_utxo = 0 self.loaded_utxo = 0 self.destroyed_utxo = 0 self.destroyed_utxo_block = 0 @@ -85,9 +85,8 @@ class UTXO(): if len(self.cached) <= self.size_limit: if block_changed and checkpoint_found: break - self.pending_utxo.add((i[0],b"".join((int_to_c_int(i[1][0]), - int_to_c_int(i[1][1]), - i[1][2])))) + self.pending_utxo.add((i[0], i[1][0], i[1][2], i[1][1])) + self.pending_saved[i[0]] = i[1] if block_changed: self.cached.append({i[0]: i[1]}) @@ -121,9 +120,14 @@ class UTXO(): if self.pending_deleted: await conn.execute("DELETE FROM connector_utxo WHERE " "outpoint = ANY($1);", self.pending_deleted) + self.deleted_utxo += len(self.pending_deleted) if self.pending_utxo: await conn.copy_records_to_table('connector_utxo', - columns=["outpoint", "data"], records=self.pending_utxo) + columns=["outpoint", + "pointer", + "address", + "amount"], + records=self.pending_utxo) await conn.execute("UPDATE connector_utxo_state SET value = $1 " "WHERE name = 'last_block';", self.checkpoint) await conn.execute("UPDATE connector_utxo_state SET value = $1 " @@ -193,17 +197,16 @@ class UTXO(): l = list(self.missed) if self.db_type == "postgresql": async with self.db.acquire() as conn: - rows = await conn.fetch("SELECT outpoint, connector_utxo.data " + rows = await conn.fetch("SELECT outpoint, " + " pointer," + " address," + " amount " "FROM connector_utxo " "WHERE outpoint = ANY($1);", l) for row in rows: - d = row["data"] - pointer = c_int_to_int(d) - f = c_int_len(pointer) - amount = c_int_to_int(d[f:]) - f += c_int_len(amount) - address = d[f:] - self.loaded[row["outpoint"]] = (pointer, amount, address) + self.loaded[row["outpoint"]] = (row["pointer"], + row["amount"], + row["address"]) self.loaded_utxo += 1