From 4ad03f3aecc237b38b613befdf3275d277b31c59 Mon Sep 17 00:00:00 2001 From: 4tochka Date: Wed, 8 May 2019 15:53:08 +0400 Subject: [PATCH] connector --- pybtc/connector/block_loader.py | 86 +++++++++++---------------------- 1 file changed, 28 insertions(+), 58 deletions(-) diff --git a/pybtc/connector/block_loader.py b/pybtc/connector/block_loader.py index a333529..c1a33c5 100644 --- a/pybtc/connector/block_loader.py +++ b/pybtc/connector/block_loader.py @@ -193,6 +193,8 @@ class Worker: self.loop.set_default_executor(ThreadPoolExecutor(20)) self.out_writer = out_writer self.in_reader = in_reader + self.coins = LRU(rpc_batch_limit * 10000) + self.destroyed_coins = LRU(rpc_batch_limit * 10000) signal.signal(signal.SIGTERM, self.terminate) self.loop.create_task(self.message_loop()) self.loop.run_forever() @@ -218,7 +220,32 @@ class Worker: blocks = dict() for x, y in zip(h, result): if y["result"] is not None: - blocks[x] = pickle.dumps(decode_block_tx(y["result"])) + block = decode_block_tx(y["result"]) + for z in block["tx"]: + tx = block["tx"][z] + for i in tx["vIn"]: + inp = tx["vIn"][i] + outpoint = b"".join((inp["txId"], int_to_bytes(inp["vOut"]))) + try: + r = self.coins[outpoint] + tx["vIn"][i]["coin"] = (outpoint, r[0], r[1], r[2]) + self.destroyed_coins[outpoint] = True + except: + pass + for i in tx["vOut"]: + out = tx["vOut"][i] + pointer = (x << 42) + (z << 21) + i + try: + address = out["scriptPubKey"] + except: + address = b"".join((bytes([out["nType"]]), out["addressHash"])) + o = b"".join((tx["txId"], int_to_bytes(i))) + self.coins[o] = (pointer, out["value"], address) + try: + out["_spent_"] = self.destroyed_coins[o] + except: pass + + blocks[x] = pickle.dumps(block) self.pipe_sent_msg(b'result', pickle.dumps(blocks)) async def message_loop(self): @@ -275,60 +302,3 @@ class Worker: self.out_writer.flush() - - -""" - - batch = list() - h_list = list() - while True: - batch.append(["getblockhash", height]) - h_list.append(height) - if len(batch) >= self.rpc_batch_limit or height >= max_height: - height += 1 - break - height += 1 - result = await self.rpc.batch(batch) - h = list() - batch = list() - for lh, r in zip(h_list, result): - try: - self.block_hashes.set(lh, r["result"]) - batch.append(["getblock", r["result"], 0]) - h.append(lh) - except: - pass - self.log.critical(">>>") - blocks = await self.block_loader.load_blocks(batch) - - for x,y in zip(h,blocks): - try: - self.block_preload.set(x, y) - except: - pass - except asyncio.CancelledError: - self.log.info("connector preload_block_hashes failed") - break - except: - pass - - if processed_height < self.last_block_height: - for i in range(processed_height, self.last_block_height ): - try: - self.block_preload.remove(i) - except: - pass - processed_height = self.last_block_height - if self.block_preload._store and next(iter(self.block_preload._store)) < processed_height + 1: - for i in range(next(iter(self.block_preload._store)), self.last_block_height+1): - try: - self.block_preload.remove(i) - except: - pass - if self.block_preload._store_size < self.block_preload_cache_limit * 0.9: - continue - - await asyncio.sleep(10) - # remove unused items - -"""