From 11ee819c9304d44a2cb19f4f1bf692f9df488aae Mon Sep 17 00:00:00 2001 From: 4tochka Date: Tue, 7 May 2019 01:54:38 +0400 Subject: [PATCH] connector --- pybtc/connector/block_loader.py | 51 +++++++++++++++++++++++++-------- pybtc/connector/connector.py | 1 - 2 files changed, 39 insertions(+), 13 deletions(-) diff --git a/pybtc/connector/block_loader.py b/pybtc/connector/block_loader.py index 19bfad7..c174540 100644 --- a/pybtc/connector/block_loader.py +++ b/pybtc/connector/block_loader.py @@ -6,12 +6,17 @@ from setproctitle import setproctitle import logging import signal import sys +import aiojsonrpc class BlockLoader: def __init__(self, parent, workers=4): self.worker = dict() + self.worker_busy = dict() self.log = parent.log self.loop = parent.loop + self.rpc_url = parent.rpc_url + self.rpc_timeout = parent.rpc_timeout + self.rpc_batch_limit = parent.rpc_batch_limit self.loop.set_default_executor(ThreadPoolExecutor(workers * 2)) [self.loop.create_task(self.start_worker(i)) for i in range(workers)] @@ -24,7 +29,8 @@ class BlockLoader: in_writer, out_writer = os.fdopen(in_writer,'wb'), os.fdopen(out_writer,'wb') # create new process - worker = Process(target=Worker, args=(index, in_reader, in_writer, out_reader, out_writer)) + worker = Process(target=Worker, args=(index, in_reader, in_writer, out_reader, out_writer, + self.rpc_url, self.rpc_timeout, self.rpc_batch_limit)) worker.start() in_reader.close() out_writer.close() @@ -33,6 +39,7 @@ class BlockLoader: worker.writer = in_writer worker.name = str(index) self.worker[index] = worker + self.worker_busy[index] = False # start message loop self.loop.create_task(self.message_loop(self.worker[index])) # wait if process crash @@ -40,6 +47,18 @@ class BlockLoader: del self.worker[index] self.log.warning('Block loader worker %s is stopped' % index) + async def load_blocks(self, batch): + while True: + for i in self.worker_busy: + if not self.worker_busy[i]: + self.worker_busy[i] = True + try: + self.pipe_sent_msg(self.worker[i].writer, b'get', batch) + finally: + self.worker_busy[i] = False + return None + await asyncio.sleep(1) + async def get_pipe_reader(self, fd_reader): reader = asyncio.StreamReader() @@ -100,9 +119,14 @@ class BlockLoader: class Worker: - def __init__(self, name , in_reader, in_writer, out_reader, out_writer): + def __init__(self, name , in_reader, in_writer, out_reader, out_writer, + rpc_url, rpc_timeout, rpc_batch_limit): setproctitle('Block loader: worker %s' % name) + self.rpc_url = rpc_url + self.rpc_timeout = rpc_timeout + self.rpc_batch_limit = rpc_batch_limit self.name = name + in_writer.close() out_reader.close() policy = asyncio.get_event_loop_policy() @@ -115,8 +139,21 @@ class Worker: self.in_reader = in_reader signal.signal(signal.SIGTERM, self.terminate) self.loop.create_task(self.message_loop()) + self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout) self.loop.run_forever() + async def message_loop(self): + self.reader = await self.get_pipe_reader(self.in_reader) + while True: + msg_type, msg = await self.pipe_get_msg(self.reader) + if msg_type == b'pipe_read_error': + return + + if msg_type == b'get': + self.log.critical(str(len(msg))) + continue + + def terminate(self,a,b): sys.exit(0) @@ -156,15 +193,5 @@ class Worker: - async def message_loop(self): - self.reader = await self.get_pipe_reader(self.in_reader) - while True: - msg_type, msg = await self.pipe_get_msg(self.reader) - if msg_type == b'pipe_read_error': - return - - if msg_type == b'result': - msg - continue diff --git a/pybtc/connector/connector.py b/pybtc/connector/connector.py index 73bb4a3..c6c456e 100644 --- a/pybtc/connector/connector.py +++ b/pybtc/connector/connector.py @@ -657,7 +657,6 @@ class Connector: async def preload_blocks(self): - return if self.block_hashes_preload_mutex: return try: