connector

This commit is contained in:
4tochka 2019-06-03 14:21:15 +04:00
parent 7e0ee2a9bb
commit d799227231

View File

@ -65,7 +65,7 @@ class BlockLoader:
self.worker_tasks = [self.loop.create_task(self.start_worker(i)) for i in range(self.worker_limit)] self.worker_tasks = [self.loop.create_task(self.start_worker(i)) for i in range(self.worker_limit)]
target_height = self.parent.node_last_block - self.parent.deep_sync_limit target_height = self.parent.node_last_block - self.parent.deep_sync_limit
height = self.parent.last_block_height + 1 height = self.parent.last_block_height + 1
self.log.info(str(height))
while height < target_height: while height < target_height:
new_requests = 0 new_requests = 0
if self.parent.block_preload._store_size < self.parent.block_preload_cache_limit: if self.parent.block_preload._store_size < self.parent.block_preload_cache_limit:
@ -86,7 +86,7 @@ class BlockLoader:
if height <= self.parent.last_block_height: if height <= self.parent.last_block_height:
height = self.parent.last_block_height + 1 height = self.parent.last_block_height + 1
await self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit', await self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit',
int_to_bytes(self.rpc_batch_limit)) int_to_bytes(self.rpc_batch_limit))
await self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height)) await self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height))
height += self.rpc_batch_limit height += self.rpc_batch_limit
new_requests += 1 new_requests += 1
@ -223,19 +223,20 @@ class Worker:
policy.set_event_loop(policy.new_event_loop()) policy.set_event_loop(policy.new_event_loop())
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.log = logging.getLogger("Block loader") self.log = logging.getLogger("Block loader")
self.log.setLevel(logging.INFO) self.log.setLevel(logging.DEBUG)
self.loop.set_default_executor(ThreadPoolExecutor(20)) self.loop.set_default_executor(ThreadPoolExecutor(20))
self.out_writer = out_writer self.out_writer = out_writer
self.in_reader = in_reader self.in_reader = in_reader
self.coins = MRU(4000000) self.coins = MRU(1000000)
self.destroyed_coins = MRU(4000000) self.destroyed_coins = MRU(1000000)
self.a_coins = MRU(4000000) self.a_coins = MRU(1000000)
signal.signal(signal.SIGTERM, self.terminate) signal.signal(signal.SIGTERM, self.terminate)
self.loop.create_task(self.message_loop()) self.loop.create_task(self.message_loop())
self.loop.run_forever() self.loop.run_forever()
async def load_blocks(self, height): async def load_blocks(self, height, limit):
try: try:
self.log.critical("%s block loader get from %s to %s" % (self.name, height, limit))
attempt = 10 attempt = 10
t = 0 t = 0
start_height = height start_height = height
@ -244,7 +245,7 @@ class Worker:
while True: while True:
batch.append(["getblockhash", height]) batch.append(["getblockhash", height])
h_list.append(height) h_list.append(height)
if len(batch) >= self.rpc_batch_limit: if len(batch) >= limit:
height += 1 height += 1
break break
height += 1 height += 1
@ -312,10 +313,14 @@ class Worker:
except: pass except: pass
blocks[x] = pickle.dumps(blocks[x]) blocks[x] = pickle.dumps(blocks[x])
self.log.critical("%s block loader blocks %s" %(self.name, len(blocks[x])))
self.log.critical("%s block loader checkpoint %s" %(self.name, blocks[x]["checkpoint"]))
await self.pipe_sent_msg(b'result', pickle.dumps(blocks)) await self.pipe_sent_msg(b'result', pickle.dumps(blocks))
except: except:
await self.pipe_sent_msg(b'result', pickle.dumps([])) self.log.critical("load blocks error")
self.log.critical(str(traceback.format_exc())) self.log.critical(str(traceback.format_exc()))
await self.pipe_sent_msg(b'result', pickle.dumps([]))
async def message_loop(self): async def message_loop(self):
try: try:
@ -328,7 +333,7 @@ class Worker:
return return
if msg_type == b'get': if msg_type == b'get':
self.loop.create_task(self.load_blocks(bytes_to_int(msg))) self.loop.create_task(self.load_blocks(bytes_to_int(msg), self.rpc_batch_limit))
continue continue
if msg_type == b'rpc_batch_limit': if msg_type == b'rpc_batch_limit':