connector

This commit is contained in:
4tochka 2019-05-18 00:27:27 +04:00
parent bc6c422e6d
commit 888c6ed346

View File

@ -19,6 +19,7 @@ class Connector:
last_block_height=0, chain_tail=None,
tx_handler=None, orphan_handler=None,
before_block_handler=None, block_handler=None, after_block_handler=None,
block_batch_handler=None,
block_timeout=30,
deep_sync_limit=20, backlog=0, mempool_tx=True,
rpc_batch_limit=50, rpc_threads_limit=100, rpc_timeout=100,
@ -43,6 +44,7 @@ class Connector:
self.before_block_handler = before_block_handler
self.block_handler = block_handler
self.after_block_handler = after_block_handler
self.block_batch_handler = block_batch_handler
self.deep_sync_limit = deep_sync_limit
self.backlog = backlog
self.mempool_tx = mempool_tx
@ -367,8 +369,10 @@ class Connector:
if self.before_block_handler and not self.cache_loading:
await self.before_block_handler(block)
await self.fetch_block_transactions(block, tx_bin_list)
if self.block_batch_handler:
await self._block_as_transactions_batch(block)
else:
await self.fetch_block_transactions(block, tx_bin_list)
if self.block_handler and not self.cache_loading:
await self.block_handler(block)
@ -513,6 +517,81 @@ class Connector:
rate = round(self.total_received_tx/self.total_received_tx_time)
self.log.debug("Transactions received: %s [%s] received tx rate tx/s ->> %s <<" % (tx_count, time.time() - q, rate))
async def _block_as_transactions_batch(self, block):
try:
if self.utxo:
for q in block["rawTx"]:
tx = block["rawTx"][q]
for i in tx["vOut"]:
self.coins += 1
if "_s_" in tx["vOut"][i]:
self.tt += 1
else:
out = tx["vOut"][i]
if self.skip_opreturn and out["nType"] in (3, 8):
continue
pointer = (block["height"] << 42) + (block["height"] << 21) + i
try:
address = b"".join((bytes([out["nType"]]), out["scriptPubKey"]))
except:
address = b"".join((bytes([out["nType"]]), out["addressHash"]))
self.utxo.set(b"".join((tx["txId"], int_to_bytes(i))), pointer, out["value"], address)
c = 0
ti = 0
stxo, missed = dict(), set()
for q in block["rawTx"]:
tx = block["rawTx"][q]
if not tx["coinbase"]:
if self.utxo:
for i in tx["vIn"]:
ti += 1
self.destroyed_coins += 1
inp = tx["vIn"][i]
outpoint = b"".join((inp["txId"], int_to_bytes(inp["vOut"])))
tx["vIn"][i]["outpoint"] = outpoint
try:
tx["vIn"][i]["coin"] = inp["_a_"]
c += 1
self.aa += 1
except:
try:
tx["vIn"][i]["coin"] = inp["_c_"]
c += 1
self.yy += 1
try:
self.utxo.get(outpoint)
except:
self.utxo.deleted.add(outpoint)
except:
r = self.utxo.get(outpoint)
if r:
tx["vIn"][i]["coin"] = r
c += 1
else:
missed.add((outpoint, (block["height"] << 42) + (q << 21) + i, q, i))
if missed:
await self.utxo.load_utxo()
for o, s, q, i in missed:
block["rawTx"][q]["vIn"][i]["coin"] = self.utxo.get_loaded(o)
c += 1
if c != ti and not self.cache_loading:
self.log.critical("utxo get failed " + rh2s(block["hash"]))
raise Exception("utxo get failed ")
if self.block_batch_handler and not self.cache_loading:
await self.block_batch_handler(tx, block)
except Exception as err:
self.log.debug("new block error %s " % err)
self.log.debug(str(traceback.format_exc()))
finally:
pass
async def verify_block_position(self, block):
if "previousblockhash" not in block :
return