diff --git a/pybtc/connector.py b/pybtc/connector.py index ee4fa55..df35e06 100644 --- a/pybtc/connector.py +++ b/pybtc/connector.py @@ -1,8 +1,12 @@ from pybtc.functions.tools import rh2s, s2rh from pybtc.functions.tools import var_int_to_int from pybtc.functions.tools import read_var_int +from pybtc.functions.hash import double_sha256 from pybtc.transaction import Transaction +from pybtc.block import Block from pybtc import int_to_c_int, c_int_to_int, c_int_len, int_to_bytes +from pybtc.functions.block import bits_to_target, target_to_difficulty +from struct import unpack, pack import traceback import aiojsonrpc import zmq @@ -293,15 +297,12 @@ class Connector: self.deep_synchronization = False q = time.time() if self.deep_synchronization: - # self.log.critical(str(self.last_block_height + 1)) h = self.block_hashes.get(self.last_block_height + 1) - # self.log.critical(str(h)) if h is None: h = await self.rpc.getblockhash(self.last_block_height + 1) self.loop.create_task(self.preload_block_hashes()) else: h = await self.rpc.getblockhash(self.last_block_height + 1) - # self.log.critical(str(h)) self.blocks_download_time += time.time() - q await self._get_block_by_hash(h) except Exception as err: @@ -312,8 +313,10 @@ class Connector: async def _get_block_by_hash(self, hash): self.log.debug("get block by hash %s" % hash) try: - block = self.block_hashes.pop(hash) - if not block: + if self.deep_synchronization: + raw_block = await self.rpc.getblock(hash, 0) + block = decode_block_tx(raw_block) + else: q = time.time() block = await self.rpc.getblock(hash) self.blocks_download_time += time.time() - q @@ -330,7 +333,10 @@ class Connector: self.cache_loading = True if self.last_block_height < self.app_block_height_on_start else False try: - tx_bin_list = [s2rh(h) for h in block["tx"]] + if self.deep_synchronization: + tx_bin_list = [block["rawTx"][i]["txId"] for i in block["rawTx"]] + else: + tx_bin_list = [s2rh(h) for h in block["tx"]] await self.verify_block_position(block) if self.before_block_handler and not self.cache_loading: @@ -385,36 +391,39 @@ class Connector: self.loop.create_task(self.get_next_block()) async def fetch_block_transactions(self, block, tx_bin_list): - if not self.deep_synchronization: - missed = set() - for h in tx_bin_list: - if self.tx_cache.get(h) is None: - missed.add(h) - else: - missed = list(tx_bin_list) - self.log.debug("Transactions missed %s" % len(missed)) - q = time.time() - if missed: - self.missed_tx = set(missed) - self.await_tx = set(missed) - self.await_tx_future = {i: asyncio.Future() for i in missed} + if self.deep_synchronization: + self.await_tx = set(tx_bin_list) + self.await_tx_future = {i: asyncio.Future() for i in tx_bin_list} self.block_txs_request = asyncio.Future() - if self.deep_synchronization or self.mempool_tx == False: - self.loop.create_task(self._get_missed(block["hash"], block["time"], block["height"])) - else: + for i in block["rawTx"]: + self.loop.create_task(self._new_transaction(block["rawTx"][i], + block["time"], + block["height"], + i)) + await asyncio.wait_for(self.block_txs_request, timeout=self.block_timeout) + + elif tx_bin_list: + raise Exception("not emplemted") + missed = list(tx_bin_list) + self.log.debug("Transactions missed %s" % len(missed)) + + if missed: + self.missed_tx = set(missed) + self.await_tx = set(missed) + self.await_tx_future = {i: asyncio.Future() for i in missed} + self.block_txs_request = asyncio.Future() self.loop.create_task(self._get_missed(False, block["time"], block["height"])) - try: - await asyncio.wait_for(self.block_txs_request, timeout=self.block_timeout) - except asyncio.CancelledError: - # refresh rpc connection session - await self.rpc.close() - self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout) - raise RuntimeError("block transaction request timeout") + try: + await asyncio.wait_for(self.block_txs_request, timeout=self.block_timeout) + except asyncio.CancelledError: + # refresh rpc connection session + await self.rpc.close() + self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout) + raise RuntimeError("block transaction request timeout") tx_count = len(block["tx"]) self.total_received_tx += tx_count self.total_received_tx_time += time.time() - q - rate = round(self.total_received_tx/self.total_received_tx_time) self.log.debug("Transactions received: %s [%s] received tx rate tx/s ->> %s <<" % (tx_count, time.time() - q, rate)) @@ -835,9 +844,25 @@ def get_stream(stream): def decode_block_tx(block): - stream = get_stream(block) - stream.seek(80) - return {i: Transaction(stream, format="raw") for i in range(var_int_to_int(read_var_int(stream)))} + s = get_stream(block) + b = dict() + b["version"] = unpack("L", b["version"]).hex() + b["previousBlockHash"] = rh2s(s.read(32)) + b["merkleRoot"] = rh2s(s.read(32)) + b["time"] = unpack("