connector

This commit is contained in:
4tochka 2019-05-16 08:53:44 +04:00
parent 84b702e443
commit b60fe22663

View File

@ -19,6 +19,8 @@ class BlockLoader:
def __init__(self, parent, workers=8):
self.worker_limit = workers
self.worker = dict()
self.worker_reader = dict()
self.worker_writer = dict()
self.worker_tasks = list()
self.worker_busy = dict()
self.parent = parent
@ -79,9 +81,13 @@ class BlockLoader:
self.worker_busy[i] = True
if height <= self.parent.last_block_height:
height = self.parent.last_block_height + 1
self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit',
# self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit',
# int_to_bytes(self.rpc_batch_limit))
# self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height))
self.pipe_sent_msg(self.worker_writer[i], b'rpc_batch_limit',
int_to_bytes(self.rpc_batch_limit))
self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height))
self.pipe_sent_msg(self.worker_writer[i], b'get', int_to_bytes(height))
height += self.rpc_batch_limit
new_requests += 1
if not new_requests:
@ -107,22 +113,27 @@ class BlockLoader:
in_writer, out_writer = os.fdopen(in_writer,'wb'), os.fdopen(out_writer,'wb')
# create new process
worker = Process(target=Worker, args=(index, in_reader, in_writer, out_reader, out_writer,
self.rpc_url, self.rpc_timeout, self.rpc_batch_limit))
worker.start()
# worker = Process(target=Worker, args=(index, in_reader, in_writer, out_reader, out_writer,
# self.rpc_url, self.rpc_timeout, self.rpc_batch_limit))
# worker.start()
k = self.loop.create_task(Worker(index, in_reader, in_writer, out_reader, out_writer,
self.rpc_url, self.rpc_timeout, self.rpc_batch_limit))
in_reader.close()
out_writer.close()
# get stream reader
worker.reader = await self.get_pipe_reader(out_reader)
worker.writer = in_writer
worker.name = str(index)
self.worker[index] = worker
# worker.reader = await self.get_pipe_reader(out_reader)
# worker.writer = in_writer
# worker.name = str(index)
self.worker_reader[index] = out_reader
self.worker_writer[index] = in_writer
# self.worker[index] = worker
self.worker_busy[index] = False
# start message loop
self.loop.create_task(self.message_loop(index))
# wait if process crash
r = await self.loop.run_in_executor(None, worker.join)
del self.worker[index]
# r = await self.loop.run_in_executor(None, worker.join)
# del self.worker[index]
r =0
self.log.warning('Block loader worker %s is stopped [%s]' % (index, r))
@ -164,7 +175,8 @@ class BlockLoader:
async def message_loop(self, index):
while True:
msg_type, msg = await self.pipe_get_msg(self.worker[index].reader)
# msg_type, msg = await self.pipe_get_msg(self.worker[index].reader)
msg_type, msg = await self.pipe_get_msg(self.worker_reader[index])
if msg_type == b'pipe_read_error':
if not self.worker[index].is_alive():
return
@ -194,7 +206,7 @@ class Worker:
def __init__(self, name , in_reader, in_writer, out_reader, out_writer,
rpc_url, rpc_timeout, rpc_batch_limit):
setproctitle('Block loader: worker %s' % name)
# setproctitle('Block loader: worker %s' % name)
self.rpc_url = rpc_url
self.rpc_timeout = rpc_timeout
self.rpc_batch_limit = rpc_batch_limit