diff --git a/pybtc/connector/block_loader.py b/pybtc/connector/block_loader.py index dd18cf5..2fbc0ce 100644 --- a/pybtc/connector/block_loader.py +++ b/pybtc/connector/block_loader.py @@ -80,9 +80,9 @@ class BlockLoader: self.worker_busy[i] = True if height <= self.parent.last_block_height: height = self.parent.last_block_height + 1 - self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit', + await self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit', int_to_bytes(self.rpc_batch_limit)) - self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height)) + await self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height)) height += self.rpc_batch_limit new_requests += 1 if not new_requests: @@ -162,7 +162,7 @@ class BlockLoader: except: return b'pipe_read_error', b'' - def pipe_sent_msg(self, writer, msg_type, msg): + async def pipe_sent_msg(self, writer, msg_type, msg): msg_type = msg_type[:20].ljust(20) msg = msg_type + msg msg = b''.join((b'ME', len(msg).to_bytes(4, byteorder='little'), msg)) @@ -307,15 +307,16 @@ class Worker: # pass blocks[x] = pickle.dumps(blocks[x]) - self.pipe_sent_msg(b'result', pickle.dumps(blocks)) + await self.pipe_sent_msg(b'result', pickle.dumps(blocks)) except: - self.pipe_sent_msg(b'result', pickle.dumps([])) + await self.pipe_sent_msg(b'result', pickle.dumps([])) self.log.critical(str(traceback.format_exc())) async def message_loop(self): try: self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout) self.reader = await self.get_pipe_reader(self.in_reader) + self.writer = await self.get_pipe_writer(self.out_writer) while True: msg_type, msg = await self.pipe_get_msg(self.reader) if msg_type == b'pipe_read_error': @@ -345,6 +346,14 @@ class Worker: return None return reader + async def get_pipe_writer(self, fd_writer): + try: + wt, wp = await self.loop.connect_write_pipe(asyncio.streams.FlowControlMixin, fd_writer) + writer = asyncio.streams.StreamWriter(wt, wp, None, self.loop) + except: + return None + return writer + async def pipe_get_msg(self, reader): while True: try: @@ -362,11 +371,11 @@ class Worker: except: return b'pipe_read_error', b'' - def pipe_sent_msg(self, msg_type, msg): + async def pipe_sent_msg(self, msg_type, msg): msg_type = msg_type[:20].ljust(20) msg = msg_type + msg msg = b''.join((b'ME', len(msg).to_bytes(4, byteorder='little'), msg)) self.out_writer.write(msg) - self.out_writer.flush() + await self.out_writer.drain()