diff --git a/pybtc/connector/block_loader.py b/pybtc/connector/block_loader.py index 2cc9e6c..facb5a3 100644 --- a/pybtc/connector/block_loader.py +++ b/pybtc/connector/block_loader.py @@ -19,8 +19,6 @@ class BlockLoader: def __init__(self, parent, workers=8): self.worker_limit = workers self.worker = dict() - self.worker_reader = dict() - self.worker_writer = dict() self.worker_tasks = list() self.worker_busy = dict() self.parent = parent @@ -81,13 +79,9 @@ class BlockLoader: self.worker_busy[i] = True if height <= self.parent.last_block_height: height = self.parent.last_block_height + 1 - # self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit', - # int_to_bytes(self.rpc_batch_limit)) - # self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height)) - self.pipe_sent_msg(self.worker_writer[i], b'rpc_batch_limit', + self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit', int_to_bytes(self.rpc_batch_limit)) - self.pipe_sent_msg(self.worker_writer[i], b'get', int_to_bytes(height)) - + self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height)) height += self.rpc_batch_limit new_requests += 1 if not new_requests: @@ -113,28 +107,23 @@ class BlockLoader: in_writer, out_writer = os.fdopen(in_writer,'wb'), os.fdopen(out_writer,'wb') # create new process - # worker = Process(target=Worker, args=(index, in_reader, in_writer, out_reader, out_writer, - # self.rpc_url, self.rpc_timeout, self.rpc_batch_limit)) - # worker.start() - k = self.loop.create_task(Worker(index, in_reader, in_writer, out_reader, out_writer, - self.rpc_url, self.rpc_timeout, self.rpc_batch_limit)) + worker = Process(target=Worker, args=(index, in_reader, in_writer, out_reader, out_writer, + self.rpc_url, self.rpc_timeout, self.rpc_batch_limit)) + worker.start() in_reader.close() out_writer.close() # get stream reader - # worker.reader = await self.get_pipe_reader(out_reader) - # worker.writer = in_writer - # worker.name = str(index) - self.worker_reader[index] = out_reader - self.worker_writer[index] = in_writer - # self.worker[index] = worker + worker.reader = await self.get_pipe_reader(out_reader) + worker.writer = in_writer + worker.name = str(index) + self.worker[index] = worker self.worker_busy[index] = False # start message loop self.loop.create_task(self.message_loop(index)) # wait if process crash - # r = await self.loop.run_in_executor(None, worker.join) - # del self.worker[index] - r =0 - self.log.warning('Block loader worker %s is stopped [%s]' % (index, r)) + await self.loop.run_in_executor(None, worker.join) + del self.worker[index] + self.log.warning('Block loader worker %s is stopped' % index) @@ -175,8 +164,7 @@ class BlockLoader: async def message_loop(self, index): while True: - # msg_type, msg = await self.pipe_get_msg(self.worker[index].reader) - msg_type, msg = await self.pipe_get_msg(self.worker_reader[index]) + msg_type, msg = await self.pipe_get_msg(self.worker[index].reader) if msg_type == b'pipe_read_error': if not self.worker[index].is_alive(): return @@ -206,7 +194,7 @@ class Worker: def __init__(self, name , in_reader, in_writer, out_reader, out_writer, rpc_url, rpc_timeout, rpc_batch_limit): - # setproctitle('Block loader: worker %s' % name) + setproctitle('Block loader: worker %s' % name) self.rpc_url = rpc_url self.rpc_timeout = rpc_timeout self.rpc_batch_limit = rpc_batch_limit @@ -221,8 +209,8 @@ class Worker: self.loop.set_default_executor(ThreadPoolExecutor(20)) self.out_writer = out_writer self.in_reader = in_reader - self.coins = MRU(100000) - self.destroyed_coins = MRU(100000) + # self.coins = MRU(100000) + # self.destroyed_coins = MRU(100000) signal.signal(signal.SIGTERM, self.terminate) self.loop.create_task(self.message_loop()) self.loop.run_forever() @@ -257,10 +245,10 @@ class Worker: inp = block["rawTx"][z]["vIn"][i] outpoint = b"".join((inp["txId"], int_to_bytes(inp["vOut"]))) try: - r = self.coins.delete(outpoint) - block["rawTx"][z]["vIn"][i]["_c_"] = r + # r = self.coins.delete(outpoint) + # block["rawTx"][z]["vIn"][i]["_c_"] = r t += 1 - self.destroyed_coins[r[0]] = True + # self.destroyed_coins[r[0]] = True except: pass for i in block["rawTx"][z]["vOut"]: @@ -271,7 +259,7 @@ class Worker: except: address = b"".join((bytes([block["rawTx"][z]["vOut"][i]["nType"]]), block["rawTx"][z]["vOut"][i]["addressHash"])) - self.coins[o] = (pointer, block["rawTx"][z]["vOut"][i]["value"], address) + # self.coins[o] = (pointer, block["rawTx"][z]["vOut"][i]["value"], address) blocks[x] = block if blocks: blocks[x]["checkpoint"] = x @@ -280,7 +268,7 @@ class Worker: for i in blocks[x]["rawTx"][y]["vOut"]: try: pointer = (x << 42) + (y << 21) + i - r = self.destroyed_coins.delete(pointer) + # r = self.destroyed_coins.delete(pointer) blocks[x]["rawTx"][y]["vOut"][i]["_s_"] = r except: pass @@ -298,7 +286,6 @@ class Worker: while True: msg_type, msg = await self.pipe_get_msg(self.reader) if msg_type == b'pipe_read_error': - self.log.critical("pipe_read_error") return if msg_type == b'get':