connector

This commit is contained in:
4tochka 2019-05-02 12:55:34 +04:00
parent a4c0c03dec
commit e37f623534

View File

@ -77,7 +77,7 @@ class Connector:
# cache and system
self.preload = preload
self.block_preload = Cache(max_size=50000)
self.block_preload = Cache(max_size=500)
self.block_hashes = Cache(max_size=100000)
self.block_hashes_preload_mutex = False
self.tx_cache = Cache(max_size=50000)
@ -300,7 +300,8 @@ class Connector:
h = self.block_hashes.get(self.last_block_height + 1)
if h is None:
h = await self.rpc.getblockhash(self.last_block_height + 1)
self.loop.create_task(self.preload_block_hashes())
if not self.block_hashes_preload_mutex:
self.loop.create_task(self.preload_block_hashes())
else:
h = await self.rpc.getblockhash(self.last_block_height + 1)
self.blocks_download_time += time.time() - q
@ -315,7 +316,9 @@ class Connector:
try:
if self.deep_synchronization:
q = time.time()
raw_block = await self.rpc.getblock(hash, 0)
raw_block = self.block_preload.get(hash)
if not raw_block:
raw_block = await self.rpc.getblock(hash, 0)
self.blocks_download_time += time.time() - q
q = time.time()
block = decode_block_tx(raw_block)
@ -607,35 +610,56 @@ class Connector:
return
try:
self.block_hashes_preload_mutex = True
self.block_hashes = Cache(max_size=100000)
self.block_hashes = Cache(max_size=500)
max_height = self.node_last_block - self.deep_synchronization
height = self.last_block_height + 1
lh = height
l = height + 100000
if l > max_height:
l = max_height
while height < l:
try:
batch = list()
while True:
batch.append(["getblockhash", height])
if len(batch) >= self.batch_limit or height >= l:
height += 1
break
height += 1
result = await self.rpc.batch(batch)
for r in result:
try:
self.block_hashes.set(lh, r["result"])
except:
pass
lh += 1
processed_height = self.last_block_height
while height < max_height:
if self.last_block_height - height < 400:
try:
batch = list()
h_list = list()
while True:
batch.append(["getblockhash", height])
h_list.append(height)
if len(batch) >= self.batch_limit or height >= max_height:
height += 1
break
height += 1
result = await self.rpc.batch(batch)
headers = list()
batch = list()
for lh, r in zip(h_list, result):
try:
self.block_hashes.set(lh, r["result"])
batch.append(["getblock", r["result"], 0])
headers.append(r["result"])
except:
pass
blocks = await self.rpc.batch(batch)
for x,y in zip(headers, blocks):
try:
self.block_preload.set(x, y)
except:
pass
except asyncio.CancelledError:
self.log.info("connector preload_block_hashes failed")
break
except:
pass
if self.block_preload.len() < 500:
continue
await asyncio.sleep(10)
# remove unused items
[self.block_preload.remove(i) for i in range(processed_height,
self.last_block_height)]
processed_height = self.last_block_height
except asyncio.CancelledError:
self.log.info("connector preload_block_hashes failed")
break
except:
pass
finally:
self.block_hashes_preload_mutex = False
@ -932,6 +956,12 @@ class Cache():
except:
return None
def remove(self, key):
try:
del self._store[key]
except:
pass
def pop_last(self):
try:
i = next(reversed(self._store))