connector

This commit is contained in:
4tochka 2019-05-08 15:53:08 +04:00
parent d8f327cf94
commit 4ad03f3aec

View File

@ -193,6 +193,8 @@ class Worker:
self.loop.set_default_executor(ThreadPoolExecutor(20))
self.out_writer = out_writer
self.in_reader = in_reader
self.coins = LRU(rpc_batch_limit * 10000)
self.destroyed_coins = LRU(rpc_batch_limit * 10000)
signal.signal(signal.SIGTERM, self.terminate)
self.loop.create_task(self.message_loop())
self.loop.run_forever()
@ -218,7 +220,32 @@ class Worker:
blocks = dict()
for x, y in zip(h, result):
if y["result"] is not None:
blocks[x] = pickle.dumps(decode_block_tx(y["result"]))
block = decode_block_tx(y["result"])
for z in block["tx"]:
tx = block["tx"][z]
for i in tx["vIn"]:
inp = tx["vIn"][i]
outpoint = b"".join((inp["txId"], int_to_bytes(inp["vOut"])))
try:
r = self.coins[outpoint]
tx["vIn"][i]["coin"] = (outpoint, r[0], r[1], r[2])
self.destroyed_coins[outpoint] = True
except:
pass
for i in tx["vOut"]:
out = tx["vOut"][i]
pointer = (x << 42) + (z << 21) + i
try:
address = out["scriptPubKey"]
except:
address = b"".join((bytes([out["nType"]]), out["addressHash"]))
o = b"".join((tx["txId"], int_to_bytes(i)))
self.coins[o] = (pointer, out["value"], address)
try:
out["_spent_"] = self.destroyed_coins[o]
except: pass
blocks[x] = pickle.dumps(block)
self.pipe_sent_msg(b'result', pickle.dumps(blocks))
async def message_loop(self):
@ -275,60 +302,3 @@ class Worker:
self.out_writer.flush()
"""
batch = list()
h_list = list()
while True:
batch.append(["getblockhash", height])
h_list.append(height)
if len(batch) >= self.rpc_batch_limit or height >= max_height:
height += 1
break
height += 1
result = await self.rpc.batch(batch)
h = list()
batch = list()
for lh, r in zip(h_list, result):
try:
self.block_hashes.set(lh, r["result"])
batch.append(["getblock", r["result"], 0])
h.append(lh)
except:
pass
self.log.critical(">>>")
blocks = await self.block_loader.load_blocks(batch)
for x,y in zip(h,blocks):
try:
self.block_preload.set(x, y)
except:
pass
except asyncio.CancelledError:
self.log.info("connector preload_block_hashes failed")
break
except:
pass
if processed_height < self.last_block_height:
for i in range(processed_height, self.last_block_height ):
try:
self.block_preload.remove(i)
except:
pass
processed_height = self.last_block_height
if self.block_preload._store and next(iter(self.block_preload._store)) < processed_height + 1:
for i in range(next(iter(self.block_preload._store)), self.last_block_height+1):
try:
self.block_preload.remove(i)
except:
pass
if self.block_preload._store_size < self.block_preload_cache_limit * 0.9:
continue
await asyncio.sleep(10)
# remove unused items
"""