connector

This commit is contained in:
4tochka 2019-05-02 15:19:24 +04:00
parent 6c1209de00
commit e3de215607

View File

@ -298,15 +298,22 @@ class Connector:
self.deep_synchronization = False self.deep_synchronization = False
q = time.time() q = time.time()
if self.deep_synchronization: if self.deep_synchronization:
h = self.block_hashes.get(self.last_block_height + 1) raw_block = self.block_preload.get(self.last_block_height + 1)
if h is None: if raw_block:
h = await self.rpc.getblockhash(self.last_block_height + 1) block = decode_block_tx(raw_block)
if not self.block_hashes_preload_mutex: else:
self.loop.create_task(self.preload_block_hashes()) h = self.block_hashes.get(self.last_block_height + 1)
if h is None:
h = await self.rpc.getblockhash(self.last_block_height + 1)
if not self.block_hashes_preload_mutex:
self.loop.create_task(self.preload_block_hashes())
block = await self._get_block_by_hash(h)
else: else:
h = await self.rpc.getblockhash(self.last_block_height + 1) h = await self.rpc.getblockhash(self.last_block_height + 1)
block = await self._get_block_by_hash(h)
self.blocks_download_time += time.time() - q self.blocks_download_time += time.time() - q
await self._get_block_by_hash(h)
self.loop.create_task(self._new_block(block))
except Exception as err: except Exception as err:
self.log.error("get next block failed %s" % str(err)) self.log.error("get next block failed %s" % str(err))
finally: finally:
@ -317,10 +324,8 @@ class Connector:
try: try:
if self.deep_synchronization: if self.deep_synchronization:
q = time.time() q = time.time()
raw_block = self.block_preload.get(hash) self.non_cached_blocks += 1
if not raw_block: raw_block = await self.rpc.getblock(hash, 0)
self.non_cached_blocks += 1
raw_block = await self.rpc.getblock(hash, 0)
self.blocks_download_time += time.time() - q self.blocks_download_time += time.time() - q
q = time.time() q = time.time()
block = decode_block_tx(raw_block) block = decode_block_tx(raw_block)
@ -329,7 +334,7 @@ class Connector:
q = time.time() q = time.time()
block = await self.rpc.getblock(hash) block = await self.rpc.getblock(hash)
self.blocks_download_time += time.time() - q self.blocks_download_time += time.time() - q
self.loop.create_task(self._new_block(block)) return block
except Exception: except Exception:
self.log.error("get block by hash %s FAILED" % hash) self.log.error("get block by hash %s FAILED" % hash)
self.log.error(str(traceback.format_exc())) self.log.error(str(traceback.format_exc()))
@ -633,21 +638,21 @@ class Connector:
break break
height += 1 height += 1
result = await self.rpc.batch(batch) result = await self.rpc.batch(batch)
headers = list() h = list()
batch = list() batch = list()
for lh, r in zip(h_list, result): for lh, r in zip(h_list, result):
try: try:
self.block_hashes.set(lh, r["result"]) self.block_hashes.set(lh, r["result"])
batch.append(["getblock", r["result"], 0]) batch.append(["getblock", r["result"], 0])
headers.append(r["result"]) h.append(lh)
except: except:
pass pass
blocks = await self.rpc.batch(batch) blocks = await self.rpc.batch(batch)
for x,y in zip(headers, blocks): for x,y in zip(h,blocks):
try: try:
self.block_preload.set(x, y["result"]) self.block_preload.set(x, (y["result"]))
except: except:
pass pass
@ -656,13 +661,17 @@ class Connector:
break break
except: except:
pass pass
[self.block_preload.remove(i) for i in range(processed_height,
self.last_block_height)] if self.block_preload.len() < 50000:
continue
for i in range(processed_height, self.last_block_height):
self.block_preload.remove(i)
processed_height = self.last_block_height
if self.block_preload.len() < 50000: if self.block_preload.len() < 50000:
continue continue
await asyncio.sleep(10) await asyncio.sleep(10)
# remove unused items # remove unused items
processed_height = self.last_block_height
finally: finally:
self.block_hashes_preload_mutex = False self.block_hashes_preload_mutex = False