From 888c6ed34629492ef7c68681a2bf4b8f7d653f99 Mon Sep 17 00:00:00 2001 From: 4tochka Date: Sat, 18 May 2019 00:27:27 +0400 Subject: [PATCH] connector --- pybtc/connector/connector.py | 83 +++++++++++++++++++++++++++++++++++- 1 file changed, 81 insertions(+), 2 deletions(-) diff --git a/pybtc/connector/connector.py b/pybtc/connector/connector.py index 4b245f9..486a218 100644 --- a/pybtc/connector/connector.py +++ b/pybtc/connector/connector.py @@ -19,6 +19,7 @@ class Connector: last_block_height=0, chain_tail=None, tx_handler=None, orphan_handler=None, before_block_handler=None, block_handler=None, after_block_handler=None, + block_batch_handler=None, block_timeout=30, deep_sync_limit=20, backlog=0, mempool_tx=True, rpc_batch_limit=50, rpc_threads_limit=100, rpc_timeout=100, @@ -43,6 +44,7 @@ class Connector: self.before_block_handler = before_block_handler self.block_handler = block_handler self.after_block_handler = after_block_handler + self.block_batch_handler = block_batch_handler self.deep_sync_limit = deep_sync_limit self.backlog = backlog self.mempool_tx = mempool_tx @@ -367,8 +369,10 @@ class Connector: if self.before_block_handler and not self.cache_loading: await self.before_block_handler(block) - - await self.fetch_block_transactions(block, tx_bin_list) + if self.block_batch_handler: + await self._block_as_transactions_batch(block) + else: + await self.fetch_block_transactions(block, tx_bin_list) if self.block_handler and not self.cache_loading: await self.block_handler(block) @@ -513,6 +517,81 @@ class Connector: rate = round(self.total_received_tx/self.total_received_tx_time) self.log.debug("Transactions received: %s [%s] received tx rate tx/s ->> %s <<" % (tx_count, time.time() - q, rate)) + async def _block_as_transactions_batch(self, block): + try: + + if self.utxo: + for q in block["rawTx"]: + tx = block["rawTx"][q] + for i in tx["vOut"]: + self.coins += 1 + if "_s_" in tx["vOut"][i]: + self.tt += 1 + else: + out = tx["vOut"][i] + if self.skip_opreturn and out["nType"] in (3, 8): + continue + pointer = (block["height"] << 42) + (block["height"] << 21) + i + try: + address = b"".join((bytes([out["nType"]]), out["scriptPubKey"])) + except: + address = b"".join((bytes([out["nType"]]), out["addressHash"])) + self.utxo.set(b"".join((tx["txId"], int_to_bytes(i))), pointer, out["value"], address) + + c = 0 + ti = 0 + stxo, missed = dict(), set() + for q in block["rawTx"]: + tx = block["rawTx"][q] + if not tx["coinbase"]: + if self.utxo: + for i in tx["vIn"]: + ti += 1 + self.destroyed_coins += 1 + inp = tx["vIn"][i] + outpoint = b"".join((inp["txId"], int_to_bytes(inp["vOut"]))) + tx["vIn"][i]["outpoint"] = outpoint + try: + tx["vIn"][i]["coin"] = inp["_a_"] + c += 1 + self.aa += 1 + except: + try: + tx["vIn"][i]["coin"] = inp["_c_"] + c += 1 + self.yy += 1 + try: + self.utxo.get(outpoint) + except: + self.utxo.deleted.add(outpoint) + except: + r = self.utxo.get(outpoint) + if r: + tx["vIn"][i]["coin"] = r + c += 1 + else: + missed.add((outpoint, (block["height"] << 42) + (q << 21) + i, q, i)) + + if missed: + await self.utxo.load_utxo() + for o, s, q, i in missed: + block["rawTx"][q]["vIn"][i]["coin"] = self.utxo.get_loaded(o) + c += 1 + + if c != ti and not self.cache_loading: + self.log.critical("utxo get failed " + rh2s(block["hash"])) + raise Exception("utxo get failed ") + + + if self.block_batch_handler and not self.cache_loading: + await self.block_batch_handler(tx, block) + + except Exception as err: + self.log.debug("new block error %s " % err) + self.log.debug(str(traceback.format_exc())) + finally: + pass + async def verify_block_position(self, block): if "previousblockhash" not in block : return