From b60fe22663234c70166334d3dd49a15b0a6140c5 Mon Sep 17 00:00:00 2001 From: 4tochka Date: Thu, 16 May 2019 08:53:44 +0400 Subject: [PATCH] connector --- pybtc/connector/block_loader.py | 38 ++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/pybtc/connector/block_loader.py b/pybtc/connector/block_loader.py index c61e1ac..2cc9e6c 100644 --- a/pybtc/connector/block_loader.py +++ b/pybtc/connector/block_loader.py @@ -19,6 +19,8 @@ class BlockLoader: def __init__(self, parent, workers=8): self.worker_limit = workers self.worker = dict() + self.worker_reader = dict() + self.worker_writer = dict() self.worker_tasks = list() self.worker_busy = dict() self.parent = parent @@ -79,9 +81,13 @@ class BlockLoader: self.worker_busy[i] = True if height <= self.parent.last_block_height: height = self.parent.last_block_height + 1 - self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit', + # self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit', + # int_to_bytes(self.rpc_batch_limit)) + # self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height)) + self.pipe_sent_msg(self.worker_writer[i], b'rpc_batch_limit', int_to_bytes(self.rpc_batch_limit)) - self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height)) + self.pipe_sent_msg(self.worker_writer[i], b'get', int_to_bytes(height)) + height += self.rpc_batch_limit new_requests += 1 if not new_requests: @@ -107,22 +113,27 @@ class BlockLoader: in_writer, out_writer = os.fdopen(in_writer,'wb'), os.fdopen(out_writer,'wb') # create new process - worker = Process(target=Worker, args=(index, in_reader, in_writer, out_reader, out_writer, - self.rpc_url, self.rpc_timeout, self.rpc_batch_limit)) - worker.start() + # worker = Process(target=Worker, args=(index, in_reader, in_writer, out_reader, out_writer, + # self.rpc_url, self.rpc_timeout, self.rpc_batch_limit)) + # worker.start() + k = self.loop.create_task(Worker(index, in_reader, in_writer, out_reader, out_writer, + self.rpc_url, self.rpc_timeout, self.rpc_batch_limit)) in_reader.close() out_writer.close() # get stream reader - worker.reader = await self.get_pipe_reader(out_reader) - worker.writer = in_writer - worker.name = str(index) - self.worker[index] = worker + # worker.reader = await self.get_pipe_reader(out_reader) + # worker.writer = in_writer + # worker.name = str(index) + self.worker_reader[index] = out_reader + self.worker_writer[index] = in_writer + # self.worker[index] = worker self.worker_busy[index] = False # start message loop self.loop.create_task(self.message_loop(index)) # wait if process crash - r = await self.loop.run_in_executor(None, worker.join) - del self.worker[index] + # r = await self.loop.run_in_executor(None, worker.join) + # del self.worker[index] + r =0 self.log.warning('Block loader worker %s is stopped [%s]' % (index, r)) @@ -164,7 +175,8 @@ class BlockLoader: async def message_loop(self, index): while True: - msg_type, msg = await self.pipe_get_msg(self.worker[index].reader) + # msg_type, msg = await self.pipe_get_msg(self.worker[index].reader) + msg_type, msg = await self.pipe_get_msg(self.worker_reader[index]) if msg_type == b'pipe_read_error': if not self.worker[index].is_alive(): return @@ -194,7 +206,7 @@ class Worker: def __init__(self, name , in_reader, in_writer, out_reader, out_writer, rpc_url, rpc_timeout, rpc_batch_limit): - setproctitle('Block loader: worker %s' % name) + # setproctitle('Block loader: worker %s' % name) self.rpc_url = rpc_url self.rpc_timeout = rpc_timeout self.rpc_batch_limit = rpc_batch_limit