connector
This commit is contained in:
parent
6be3f0a68a
commit
19d1b2b948
@ -80,9 +80,9 @@ class BlockLoader:
|
|||||||
self.worker_busy[i] = True
|
self.worker_busy[i] = True
|
||||||
if height <= self.parent.last_block_height:
|
if height <= self.parent.last_block_height:
|
||||||
height = self.parent.last_block_height + 1
|
height = self.parent.last_block_height + 1
|
||||||
self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit',
|
await self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit',
|
||||||
int_to_bytes(self.rpc_batch_limit))
|
int_to_bytes(self.rpc_batch_limit))
|
||||||
self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height))
|
await self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height))
|
||||||
height += self.rpc_batch_limit
|
height += self.rpc_batch_limit
|
||||||
new_requests += 1
|
new_requests += 1
|
||||||
if not new_requests:
|
if not new_requests:
|
||||||
@ -162,7 +162,7 @@ class BlockLoader:
|
|||||||
except:
|
except:
|
||||||
return b'pipe_read_error', b''
|
return b'pipe_read_error', b''
|
||||||
|
|
||||||
def pipe_sent_msg(self, writer, msg_type, msg):
|
async def pipe_sent_msg(self, writer, msg_type, msg):
|
||||||
msg_type = msg_type[:20].ljust(20)
|
msg_type = msg_type[:20].ljust(20)
|
||||||
msg = msg_type + msg
|
msg = msg_type + msg
|
||||||
msg = b''.join((b'ME', len(msg).to_bytes(4, byteorder='little'), msg))
|
msg = b''.join((b'ME', len(msg).to_bytes(4, byteorder='little'), msg))
|
||||||
@ -307,15 +307,16 @@ class Worker:
|
|||||||
# pass
|
# pass
|
||||||
|
|
||||||
blocks[x] = pickle.dumps(blocks[x])
|
blocks[x] = pickle.dumps(blocks[x])
|
||||||
self.pipe_sent_msg(b'result', pickle.dumps(blocks))
|
await self.pipe_sent_msg(b'result', pickle.dumps(blocks))
|
||||||
except:
|
except:
|
||||||
self.pipe_sent_msg(b'result', pickle.dumps([]))
|
await self.pipe_sent_msg(b'result', pickle.dumps([]))
|
||||||
self.log.critical(str(traceback.format_exc()))
|
self.log.critical(str(traceback.format_exc()))
|
||||||
|
|
||||||
async def message_loop(self):
|
async def message_loop(self):
|
||||||
try:
|
try:
|
||||||
self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout)
|
self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout)
|
||||||
self.reader = await self.get_pipe_reader(self.in_reader)
|
self.reader = await self.get_pipe_reader(self.in_reader)
|
||||||
|
self.writer = await self.get_pipe_writer(self.out_writer)
|
||||||
while True:
|
while True:
|
||||||
msg_type, msg = await self.pipe_get_msg(self.reader)
|
msg_type, msg = await self.pipe_get_msg(self.reader)
|
||||||
if msg_type == b'pipe_read_error':
|
if msg_type == b'pipe_read_error':
|
||||||
@ -345,6 +346,14 @@ class Worker:
|
|||||||
return None
|
return None
|
||||||
return reader
|
return reader
|
||||||
|
|
||||||
|
async def get_pipe_writer(self, fd_writer):
|
||||||
|
try:
|
||||||
|
wt, wp = await self.loop.connect_write_pipe(asyncio.streams.FlowControlMixin, fd_writer)
|
||||||
|
writer = asyncio.streams.StreamWriter(wt, wp, None, self.loop)
|
||||||
|
except:
|
||||||
|
return None
|
||||||
|
return writer
|
||||||
|
|
||||||
async def pipe_get_msg(self, reader):
|
async def pipe_get_msg(self, reader):
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
@ -362,11 +371,11 @@ class Worker:
|
|||||||
except:
|
except:
|
||||||
return b'pipe_read_error', b''
|
return b'pipe_read_error', b''
|
||||||
|
|
||||||
def pipe_sent_msg(self, msg_type, msg):
|
async def pipe_sent_msg(self, msg_type, msg):
|
||||||
msg_type = msg_type[:20].ljust(20)
|
msg_type = msg_type[:20].ljust(20)
|
||||||
msg = msg_type + msg
|
msg = msg_type + msg
|
||||||
msg = b''.join((b'ME', len(msg).to_bytes(4, byteorder='little'), msg))
|
msg = b''.join((b'ME', len(msg).to_bytes(4, byteorder='little'), msg))
|
||||||
self.out_writer.write(msg)
|
self.out_writer.write(msg)
|
||||||
self.out_writer.flush()
|
await self.out_writer.drain()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user