connector

This commit is contained in:
4tochka 2019-05-07 01:54:38 +04:00
parent 6bb53e9290
commit 11ee819c93
2 changed files with 39 additions and 13 deletions

View File

@ -6,12 +6,17 @@ from setproctitle import setproctitle
import logging import logging
import signal import signal
import sys import sys
import aiojsonrpc
class BlockLoader: class BlockLoader:
def __init__(self, parent, workers=4): def __init__(self, parent, workers=4):
self.worker = dict() self.worker = dict()
self.worker_busy = dict()
self.log = parent.log self.log = parent.log
self.loop = parent.loop self.loop = parent.loop
self.rpc_url = parent.rpc_url
self.rpc_timeout = parent.rpc_timeout
self.rpc_batch_limit = parent.rpc_batch_limit
self.loop.set_default_executor(ThreadPoolExecutor(workers * 2)) self.loop.set_default_executor(ThreadPoolExecutor(workers * 2))
[self.loop.create_task(self.start_worker(i)) for i in range(workers)] [self.loop.create_task(self.start_worker(i)) for i in range(workers)]
@ -24,7 +29,8 @@ class BlockLoader:
in_writer, out_writer = os.fdopen(in_writer,'wb'), os.fdopen(out_writer,'wb') in_writer, out_writer = os.fdopen(in_writer,'wb'), os.fdopen(out_writer,'wb')
# create new process # create new process
worker = Process(target=Worker, args=(index, in_reader, in_writer, out_reader, out_writer)) worker = Process(target=Worker, args=(index, in_reader, in_writer, out_reader, out_writer,
self.rpc_url, self.rpc_timeout, self.rpc_batch_limit))
worker.start() worker.start()
in_reader.close() in_reader.close()
out_writer.close() out_writer.close()
@ -33,6 +39,7 @@ class BlockLoader:
worker.writer = in_writer worker.writer = in_writer
worker.name = str(index) worker.name = str(index)
self.worker[index] = worker self.worker[index] = worker
self.worker_busy[index] = False
# start message loop # start message loop
self.loop.create_task(self.message_loop(self.worker[index])) self.loop.create_task(self.message_loop(self.worker[index]))
# wait if process crash # wait if process crash
@ -40,6 +47,18 @@ class BlockLoader:
del self.worker[index] del self.worker[index]
self.log.warning('Block loader worker %s is stopped' % index) self.log.warning('Block loader worker %s is stopped' % index)
async def load_blocks(self, batch):
while True:
for i in self.worker_busy:
if not self.worker_busy[i]:
self.worker_busy[i] = True
try:
self.pipe_sent_msg(self.worker[i].writer, b'get', batch)
finally:
self.worker_busy[i] = False
return None
await asyncio.sleep(1)
async def get_pipe_reader(self, fd_reader): async def get_pipe_reader(self, fd_reader):
reader = asyncio.StreamReader() reader = asyncio.StreamReader()
@ -100,9 +119,14 @@ class BlockLoader:
class Worker: class Worker:
def __init__(self, name , in_reader, in_writer, out_reader, out_writer): def __init__(self, name , in_reader, in_writer, out_reader, out_writer,
rpc_url, rpc_timeout, rpc_batch_limit):
setproctitle('Block loader: worker %s' % name) setproctitle('Block loader: worker %s' % name)
self.rpc_url = rpc_url
self.rpc_timeout = rpc_timeout
self.rpc_batch_limit = rpc_batch_limit
self.name = name self.name = name
in_writer.close() in_writer.close()
out_reader.close() out_reader.close()
policy = asyncio.get_event_loop_policy() policy = asyncio.get_event_loop_policy()
@ -115,8 +139,21 @@ class Worker:
self.in_reader = in_reader self.in_reader = in_reader
signal.signal(signal.SIGTERM, self.terminate) signal.signal(signal.SIGTERM, self.terminate)
self.loop.create_task(self.message_loop()) self.loop.create_task(self.message_loop())
self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout)
self.loop.run_forever() self.loop.run_forever()
async def message_loop(self):
self.reader = await self.get_pipe_reader(self.in_reader)
while True:
msg_type, msg = await self.pipe_get_msg(self.reader)
if msg_type == b'pipe_read_error':
return
if msg_type == b'get':
self.log.critical(str(len(msg)))
continue
def terminate(self,a,b): def terminate(self,a,b):
sys.exit(0) sys.exit(0)
@ -156,15 +193,5 @@ class Worker:
async def message_loop(self):
self.reader = await self.get_pipe_reader(self.in_reader)
while True:
msg_type, msg = await self.pipe_get_msg(self.reader)
if msg_type == b'pipe_read_error':
return
if msg_type == b'result':
msg
continue

View File

@ -657,7 +657,6 @@ class Connector:
async def preload_blocks(self): async def preload_blocks(self):
return
if self.block_hashes_preload_mutex: if self.block_hashes_preload_mutex:
return return
try: try: