diff --git a/pybtc/connector/block_loader.py b/pybtc/connector/block_loader.py index e91e044..5ad74a0 100644 --- a/pybtc/connector/block_loader.py +++ b/pybtc/connector/block_loader.py @@ -9,6 +9,7 @@ import signal import sys import aiojsonrpc import traceback +from pybtc.connector.utils import decode_block_tx import pickle class BlockLoader: @@ -73,6 +74,7 @@ class BlockLoader: self.log.error("Loading task error %s " % err) else: await asyncio.sleep(1) + [p.terminate() for p in self.worker_tasks] @@ -149,7 +151,7 @@ class BlockLoader: continue if msg_type == b'result': - msg + self.log.critical(str(len(msg))) continue @@ -171,7 +173,6 @@ class Worker: self.rpc_timeout = rpc_timeout self.rpc_batch_limit = rpc_batch_limit self.name = name - in_writer.close() out_reader.close() policy = asyncio.get_event_loop_policy() @@ -186,6 +187,30 @@ class Worker: self.loop.create_task(self.message_loop()) self.loop.run_forever() + async def load_blocks(self, height): + batch = list() + h_list = list() + while True: + batch.append(["getblockhash", height]) + h_list.append(height) + if len(batch) >= self.rpc_batch_limit: + height += 1 + break + height += 1 + result = await self.rpc.batch(batch) + h = list() + batch = list() + for lh, r in zip(h_list, result): + if r["result"] is not None: + batch.append(["getblock", r["result"], 0]) + h.append(lh) + result = await self.rpc.batch(batch) + blocks = dict() + for x, y in zip(h, result): + if y["result"] is not None: + blocks[x] = decode_block_tx(y["result"]) + self.pipe_sent_msg(b'result', pickle.dumps(blocks)) + async def message_loop(self): try: self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout) @@ -196,7 +221,7 @@ class Worker: return if msg_type == b'get': - self.log.critical(str(bytes_to_int(msg))) + self.loop.create_task(self.load_blocks(bytes_to_int(msg))) continue except: self.log.critical("exc") @@ -234,12 +259,12 @@ class Worker: except: return b'pipe_read_error', b'' - def pipe_sent_msg(self, writer, msg_type, msg): + def pipe_sent_msg(self, msg_type, msg): msg_type = msg_type[:20].ljust(20) msg = msg_type + msg msg = b''.join((b'ME', len(msg).to_bytes(4, byteorder='little'), msg)) - writer.write(msg) - writer.flush() + self.out_writer.write(msg) + self.out_writer.flush()