connector

This commit is contained in:
4tochka 2019-05-16 08:59:15 +04:00
parent b60fe22663
commit c351166620

View File

@ -19,8 +19,6 @@ class BlockLoader:
def __init__(self, parent, workers=8):
self.worker_limit = workers
self.worker = dict()
self.worker_reader = dict()
self.worker_writer = dict()
self.worker_tasks = list()
self.worker_busy = dict()
self.parent = parent
@ -81,13 +79,9 @@ class BlockLoader:
self.worker_busy[i] = True
if height <= self.parent.last_block_height:
height = self.parent.last_block_height + 1
# self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit',
# int_to_bytes(self.rpc_batch_limit))
# self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height))
self.pipe_sent_msg(self.worker_writer[i], b'rpc_batch_limit',
self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit',
int_to_bytes(self.rpc_batch_limit))
self.pipe_sent_msg(self.worker_writer[i], b'get', int_to_bytes(height))
self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height))
height += self.rpc_batch_limit
new_requests += 1
if not new_requests:
@ -113,28 +107,23 @@ class BlockLoader:
in_writer, out_writer = os.fdopen(in_writer,'wb'), os.fdopen(out_writer,'wb')
# create new process
# worker = Process(target=Worker, args=(index, in_reader, in_writer, out_reader, out_writer,
# self.rpc_url, self.rpc_timeout, self.rpc_batch_limit))
# worker.start()
k = self.loop.create_task(Worker(index, in_reader, in_writer, out_reader, out_writer,
self.rpc_url, self.rpc_timeout, self.rpc_batch_limit))
worker = Process(target=Worker, args=(index, in_reader, in_writer, out_reader, out_writer,
self.rpc_url, self.rpc_timeout, self.rpc_batch_limit))
worker.start()
in_reader.close()
out_writer.close()
# get stream reader
# worker.reader = await self.get_pipe_reader(out_reader)
# worker.writer = in_writer
# worker.name = str(index)
self.worker_reader[index] = out_reader
self.worker_writer[index] = in_writer
# self.worker[index] = worker
worker.reader = await self.get_pipe_reader(out_reader)
worker.writer = in_writer
worker.name = str(index)
self.worker[index] = worker
self.worker_busy[index] = False
# start message loop
self.loop.create_task(self.message_loop(index))
# wait if process crash
# r = await self.loop.run_in_executor(None, worker.join)
# del self.worker[index]
r =0
self.log.warning('Block loader worker %s is stopped [%s]' % (index, r))
await self.loop.run_in_executor(None, worker.join)
del self.worker[index]
self.log.warning('Block loader worker %s is stopped' % index)
@ -175,8 +164,7 @@ class BlockLoader:
async def message_loop(self, index):
while True:
# msg_type, msg = await self.pipe_get_msg(self.worker[index].reader)
msg_type, msg = await self.pipe_get_msg(self.worker_reader[index])
msg_type, msg = await self.pipe_get_msg(self.worker[index].reader)
if msg_type == b'pipe_read_error':
if not self.worker[index].is_alive():
return
@ -206,7 +194,7 @@ class Worker:
def __init__(self, name , in_reader, in_writer, out_reader, out_writer,
rpc_url, rpc_timeout, rpc_batch_limit):
# setproctitle('Block loader: worker %s' % name)
setproctitle('Block loader: worker %s' % name)
self.rpc_url = rpc_url
self.rpc_timeout = rpc_timeout
self.rpc_batch_limit = rpc_batch_limit
@ -221,8 +209,8 @@ class Worker:
self.loop.set_default_executor(ThreadPoolExecutor(20))
self.out_writer = out_writer
self.in_reader = in_reader
self.coins = MRU(100000)
self.destroyed_coins = MRU(100000)
# self.coins = MRU(100000)
# self.destroyed_coins = MRU(100000)
signal.signal(signal.SIGTERM, self.terminate)
self.loop.create_task(self.message_loop())
self.loop.run_forever()
@ -257,10 +245,10 @@ class Worker:
inp = block["rawTx"][z]["vIn"][i]
outpoint = b"".join((inp["txId"], int_to_bytes(inp["vOut"])))
try:
r = self.coins.delete(outpoint)
block["rawTx"][z]["vIn"][i]["_c_"] = r
# r = self.coins.delete(outpoint)
# block["rawTx"][z]["vIn"][i]["_c_"] = r
t += 1
self.destroyed_coins[r[0]] = True
# self.destroyed_coins[r[0]] = True
except:
pass
for i in block["rawTx"][z]["vOut"]:
@ -271,7 +259,7 @@ class Worker:
except:
address = b"".join((bytes([block["rawTx"][z]["vOut"][i]["nType"]]),
block["rawTx"][z]["vOut"][i]["addressHash"]))
self.coins[o] = (pointer, block["rawTx"][z]["vOut"][i]["value"], address)
# self.coins[o] = (pointer, block["rawTx"][z]["vOut"][i]["value"], address)
blocks[x] = block
if blocks:
blocks[x]["checkpoint"] = x
@ -280,7 +268,7 @@ class Worker:
for i in blocks[x]["rawTx"][y]["vOut"]:
try:
pointer = (x << 42) + (y << 21) + i
r = self.destroyed_coins.delete(pointer)
# r = self.destroyed_coins.delete(pointer)
blocks[x]["rawTx"][y]["vOut"][i]["_s_"] = r
except: pass
@ -298,7 +286,6 @@ class Worker:
while True:
msg_type, msg = await self.pipe_get_msg(self.reader)
if msg_type == b'pipe_read_error':
self.log.critical("pipe_read_error")
return
if msg_type == b'get':