connector

This commit is contained in:
4tochka 2019-05-01 22:51:09 +04:00
parent d82190f4fd
commit abfd3d9453

View File

@ -1,8 +1,12 @@
from pybtc.functions.tools import rh2s, s2rh
from pybtc.functions.tools import var_int_to_int
from pybtc.functions.tools import read_var_int
from pybtc.functions.hash import double_sha256
from pybtc.transaction import Transaction
from pybtc.block import Block
from pybtc import int_to_c_int, c_int_to_int, c_int_len, int_to_bytes
from pybtc.functions.block import bits_to_target, target_to_difficulty
from struct import unpack, pack
import traceback
import aiojsonrpc
import zmq
@ -293,15 +297,12 @@ class Connector:
self.deep_synchronization = False
q = time.time()
if self.deep_synchronization:
# self.log.critical(str(self.last_block_height + 1))
h = self.block_hashes.get(self.last_block_height + 1)
# self.log.critical(str(h))
if h is None:
h = await self.rpc.getblockhash(self.last_block_height + 1)
self.loop.create_task(self.preload_block_hashes())
else:
h = await self.rpc.getblockhash(self.last_block_height + 1)
# self.log.critical(str(h))
self.blocks_download_time += time.time() - q
await self._get_block_by_hash(h)
except Exception as err:
@ -312,8 +313,10 @@ class Connector:
async def _get_block_by_hash(self, hash):
self.log.debug("get block by hash %s" % hash)
try:
block = self.block_hashes.pop(hash)
if not block:
if self.deep_synchronization:
raw_block = await self.rpc.getblock(hash, 0)
block = decode_block_tx(raw_block)
else:
q = time.time()
block = await self.rpc.getblock(hash)
self.blocks_download_time += time.time() - q
@ -330,7 +333,10 @@ class Connector:
self.cache_loading = True if self.last_block_height < self.app_block_height_on_start else False
try:
tx_bin_list = [s2rh(h) for h in block["tx"]]
if self.deep_synchronization:
tx_bin_list = [block["rawTx"][i]["txId"] for i in block["rawTx"]]
else:
tx_bin_list = [s2rh(h) for h in block["tx"]]
await self.verify_block_position(block)
if self.before_block_handler and not self.cache_loading:
@ -385,36 +391,39 @@ class Connector:
self.loop.create_task(self.get_next_block())
async def fetch_block_transactions(self, block, tx_bin_list):
if not self.deep_synchronization:
missed = set()
for h in tx_bin_list:
if self.tx_cache.get(h) is None:
missed.add(h)
else:
missed = list(tx_bin_list)
self.log.debug("Transactions missed %s" % len(missed))
q = time.time()
if missed:
self.missed_tx = set(missed)
self.await_tx = set(missed)
self.await_tx_future = {i: asyncio.Future() for i in missed}
if self.deep_synchronization:
self.await_tx = set(tx_bin_list)
self.await_tx_future = {i: asyncio.Future() for i in tx_bin_list}
self.block_txs_request = asyncio.Future()
if self.deep_synchronization or self.mempool_tx == False:
self.loop.create_task(self._get_missed(block["hash"], block["time"], block["height"]))
else:
for i in block["rawTx"]:
self.loop.create_task(self._new_transaction(block["rawTx"][i],
block["time"],
block["height"],
i))
await asyncio.wait_for(self.block_txs_request, timeout=self.block_timeout)
elif tx_bin_list:
raise Exception("not emplemted")
missed = list(tx_bin_list)
self.log.debug("Transactions missed %s" % len(missed))
if missed:
self.missed_tx = set(missed)
self.await_tx = set(missed)
self.await_tx_future = {i: asyncio.Future() for i in missed}
self.block_txs_request = asyncio.Future()
self.loop.create_task(self._get_missed(False, block["time"], block["height"]))
try:
await asyncio.wait_for(self.block_txs_request, timeout=self.block_timeout)
except asyncio.CancelledError:
# refresh rpc connection session
await self.rpc.close()
self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout)
raise RuntimeError("block transaction request timeout")
try:
await asyncio.wait_for(self.block_txs_request, timeout=self.block_timeout)
except asyncio.CancelledError:
# refresh rpc connection session
await self.rpc.close()
self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout)
raise RuntimeError("block transaction request timeout")
tx_count = len(block["tx"])
self.total_received_tx += tx_count
self.total_received_tx_time += time.time() - q
rate = round(self.total_received_tx/self.total_received_tx_time)
self.log.debug("Transactions received: %s [%s] received tx rate tx/s ->> %s <<" % (tx_count, time.time() - q, rate))
@ -835,9 +844,25 @@ def get_stream(stream):
def decode_block_tx(block):
stream = get_stream(block)
stream.seek(80)
return {i: Transaction(stream, format="raw") for i in range(var_int_to_int(read_var_int(stream)))}
s = get_stream(block)
b = dict()
b["version"] = unpack("<L", s.read(4))[0]
b["versionHex"] = pack(">L", b["version"]).hex()
b["previousBlockHash"] = rh2s(s.read(32))
b["merkleRoot"] = rh2s(s.read(32))
b["time"] = unpack("<L", s.read(4))[0]
b["bits"] = rh2s(s.read(4))
b["target"] = rh2s(bits_to_target(unpack("<L", b["bits"])[0]))
b["targetDifficulty"] = target_to_difficulty(b["target"])
b["target"] = b["target"].to_bytes(32, byteorder="little")
b["nonce"] = unpack("<L", s.read(4))[0]
s.seek(-80, 1)
b["header"] = s.read(80).hex()
b["hash"] = double_sha256(b["header"], hex=1)
b["rawTx"] = {i: Transaction(s, format="raw")
for i in range(var_int_to_int(read_var_int(s)))}
b["tx"] = [rh2s(b["rawTx"][i]["txId"]) for i in b["rawTx"] ]
return b
class DependsTransaction(Exception):