connector
This commit is contained in:
parent
1a9ffa00b5
commit
548347f32b
@ -9,6 +9,7 @@ import signal
|
||||
import sys
|
||||
import aiojsonrpc
|
||||
import traceback
|
||||
from pybtc.connector.utils import decode_block_tx
|
||||
import pickle
|
||||
|
||||
class BlockLoader:
|
||||
@ -73,6 +74,7 @@ class BlockLoader:
|
||||
self.log.error("Loading task error %s " % err)
|
||||
else:
|
||||
await asyncio.sleep(1)
|
||||
[p.terminate() for p in self.worker_tasks]
|
||||
|
||||
|
||||
|
||||
@ -149,7 +151,7 @@ class BlockLoader:
|
||||
continue
|
||||
|
||||
if msg_type == b'result':
|
||||
msg
|
||||
self.log.critical(str(len(msg)))
|
||||
continue
|
||||
|
||||
|
||||
@ -171,7 +173,6 @@ class Worker:
|
||||
self.rpc_timeout = rpc_timeout
|
||||
self.rpc_batch_limit = rpc_batch_limit
|
||||
self.name = name
|
||||
|
||||
in_writer.close()
|
||||
out_reader.close()
|
||||
policy = asyncio.get_event_loop_policy()
|
||||
@ -186,6 +187,30 @@ class Worker:
|
||||
self.loop.create_task(self.message_loop())
|
||||
self.loop.run_forever()
|
||||
|
||||
async def load_blocks(self, height):
|
||||
batch = list()
|
||||
h_list = list()
|
||||
while True:
|
||||
batch.append(["getblockhash", height])
|
||||
h_list.append(height)
|
||||
if len(batch) >= self.rpc_batch_limit:
|
||||
height += 1
|
||||
break
|
||||
height += 1
|
||||
result = await self.rpc.batch(batch)
|
||||
h = list()
|
||||
batch = list()
|
||||
for lh, r in zip(h_list, result):
|
||||
if r["result"] is not None:
|
||||
batch.append(["getblock", r["result"], 0])
|
||||
h.append(lh)
|
||||
result = await self.rpc.batch(batch)
|
||||
blocks = dict()
|
||||
for x, y in zip(h, result):
|
||||
if y["result"] is not None:
|
||||
blocks[x] = decode_block_tx(y["result"])
|
||||
self.pipe_sent_msg(b'result', pickle.dumps(blocks))
|
||||
|
||||
async def message_loop(self):
|
||||
try:
|
||||
self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout)
|
||||
@ -196,7 +221,7 @@ class Worker:
|
||||
return
|
||||
|
||||
if msg_type == b'get':
|
||||
self.log.critical(str(bytes_to_int(msg)))
|
||||
self.loop.create_task(self.load_blocks(bytes_to_int(msg)))
|
||||
continue
|
||||
except:
|
||||
self.log.critical("exc")
|
||||
@ -234,12 +259,12 @@ class Worker:
|
||||
except:
|
||||
return b'pipe_read_error', b''
|
||||
|
||||
def pipe_sent_msg(self, writer, msg_type, msg):
|
||||
def pipe_sent_msg(self, msg_type, msg):
|
||||
msg_type = msg_type[:20].ljust(20)
|
||||
msg = msg_type + msg
|
||||
msg = b''.join((b'ME', len(msg).to_bytes(4, byteorder='little'), msg))
|
||||
writer.write(msg)
|
||||
writer.flush()
|
||||
self.out_writer.write(msg)
|
||||
self.out_writer.flush()
|
||||
|
||||
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user