connector

This commit is contained in:
4tochka 2019-05-03 00:01:00 +04:00
parent 42ab1ce174
commit 5d5383a789

View File

@ -24,9 +24,8 @@ class Connector:
block_timeout=30,
deep_sync_limit=20, backlog=0, mempool_tx=True,
rpc_batch_limit=50, rpc_threads_limit=100, rpc_timeout=100,
preload=False,
utxo_data=False,
utxo_cache_size=2000000,
utxo_cache_size=1000000,
skip_opreturn=True,
postgres_pool=None):
self.loop = asyncio.get_event_loop()
@ -69,6 +68,7 @@ class Connector:
self.active_block.set_result(True)
self.last_zmq_msg = int(time.time())
self.total_received_tx = 0
self.total_received_tx_stat = 0
self.blocks_processed_count = 0
self.blocks_decode_time = 0
self.blocks_download_time = 0
@ -77,12 +77,15 @@ class Connector:
self.start_time = time.time()
# cache and system
self.preload = preload
self.block_preload = Cache(max_size=500 * 1000000)
self.block_hashes = Cache(max_size=200 * 100000)
self.block_preload_cache_limit = 500 * 1000000
self.block_hashes_cache_limit = 200 * 100000
self.tx_cache_limit = 100 * 100000
self.block_headers_cache_limit = 100 * 100000
self.block_preload = Cache(max_size=self.block_preload_cache_limit)
self.block_hashes = Cache(max_size=self.block_hashes_cache_limit)
self.block_hashes_preload_mutex = False
self.tx_cache = Cache(max_size=100 * 1000000)
self.block_cache = Cache(max_size=20 * 1000000)
self.tx_cache = Cache(max_size=self.tx_cache_limit)
self.block_headers_cache = Cache(max_size=self.block_headers_cache_limit)
self.block_txs_request = None
@ -142,7 +145,7 @@ class Connector:
if h < len(self.chain_tail):
raise Exception("Chain tail len not match last block height")
for row in reversed(self.chain_tail):
self.block_cache.set(row, h)
self.block_headers_cache.set(row, h)
h -= 1
self.tasks.append(self.loop.create_task(self.zeromq_handler()))
@ -308,7 +311,7 @@ class Connector:
if h is None:
h = await self.rpc.getblockhash(self.last_block_height + 1)
if not self.block_hashes_preload_mutex:
self.loop.create_task(self.preload_block_hashes())
self.loop.create_task(self.preload_blocks())
block = await self._get_block_by_hash(h)
else:
h = await self.rpc.getblockhash(self.last_block_height + 1)
@ -342,7 +345,7 @@ class Connector:
self.log.error(str(traceback.format_exc()))
async def _new_block(self, block):
if self.block_cache.get(block["hash"]) is not None:
if self.block_headers_cache.get(block["hash"]) is not None:
return
if self.deep_synchronization:
block["height"] = self.last_block_height + 1
@ -369,7 +372,7 @@ class Connector:
if self.block_handler and not self.cache_loading:
await self.block_handler(block)
self.block_cache.set(block["hash"], block["height"])
self.block_headers_cache.set(block["hash"], block["height"])
self.last_block_height = block["height"]
if self.utxo_data:
self.loop.create_task(self.utxo.save_utxo(block["height"]))
@ -379,18 +382,21 @@ class Connector:
[self.tx_cache.pop(h) for h in tx_bin_list]
tx_rate = round(self.total_received_tx / (time.time() - self.start_time), 4)
if block["height"] % 1000 == 0:
self.log.info("Blocks %s; tx rate: %s;" % (block["height"], tx_rate))
if (self.total_received_tx - self.total_received_tx_stat) > 10000:
self.total_received_tx_stat = self.total_received_tx
self.log.warning("Blocks %s; tx rate: %s;" % (block["height"], tx_rate))
if self.utxo_data:
loading = "Loading ... " if self.cache_loading else ""
self.log.info(loading + "UTXO %s; hit rate: %s;" % (self.utxo.len(),
self.utxo.hit_rate()))
self.log.info("Blocks download time %s;" % self.blocks_download_time)
self.log.info("Blocks decode time %s;" % self.blocks_decode_time)
self.log.info("Blocks non cached %s;" % self.non_cached_blocks)
self.log.warning("Blocks cache %s;" % self.block_preload.len())
self.log.warning("Blocks cache size %s;" % self.block_preload._store_size )
self.log.warning("Blocks cache last %s;" % self.block_preload.get_last_key())
self.log.info("Blocks downloaded %s; decoded %s" % (self.blocks_download_time,
self.blocks_decode_time))
if self.deep_synchronization:
self.log.info("Synchronization:")
self.log.info("Blocks not cached %s;" % self.non_cached_blocks)
self.log.warning("Blocks cache count %s;" % self.block_preload.len())
self.log.warning("Blocks cache size %s M;" % round(self.block_preload._store_size/1024/1024,2))
# after block added handler
if self.after_block_handler and not self.cache_loading:
@ -454,23 +460,21 @@ class Connector:
self.log.debug("Transactions received: %s [%s] received tx rate tx/s ->> %s <<" % (tx_count, time.time() - q, rate))
async def verify_block_position(self, block):
if self.block_cache.get(block["hash"]) is not None:
self.log.error("duplicated block %s" % block["hash"])
raise Exception("duplicated block")
if "previousblockhash" not in block :
return
lb = self.block_cache.get_last_key()
if lb is None and not self.last_block_height:
if self.block_headers_cache.len() == 0:
return
if self.block_cache.get_last_key() != block["previousblockhash"]:
if self.block_cache.get(block["previousblockhash"]) is None and self.last_block_height:
lb = self.block_headers_cache.get_last_key()
if self.block_headers_cache.get_last_key() != block["previousblockhash"]:
if self.block_headers_cache.get(block["previousblockhash"]) is None and self.last_block_height:
self.log.critical("Connector error! Node out of sync "
"no parent block in chain tail %s" % block["previousblockhash"])
raise Exception("Node out of sync")
if self.orphan_handler:
await self.orphan_handler(self.last_block_height)
self.block_cache.pop_last()
self.block_headers_cache.pop_last()
self.last_block_height -= 1
raise Exception("Sidebranch block removed")
@ -618,7 +622,7 @@ class Connector:
return stxo
async def preload_block_hashes(self):
async def preload_blocks(self):
if self.block_hashes_preload_mutex:
return
try:
@ -628,12 +632,10 @@ class Connector:
processed_height = self.last_block_height
while height < max_height:
if self.block_preload._store_size < 400 * 1000000:
if self.block_preload._store_size < self.block_preload_cache_limit * 0.9:
try:
if height < self.last_block_height:
height = self.last_block_height + 1
# self.log.critical(str((height, processed_height, self.last_block_height,
# self.block_hashes.len() )))
batch = list()
h_list = list()
while True:
@ -653,9 +655,7 @@ class Connector:
h.append(lh)
except:
pass
# self.log.critical(str(( len(batch), )))
# if not batch:
# self.log.critical(str((h_list, result)))
blocks = await self.rpc.batch(batch)
for x,y in zip(h,blocks):
@ -673,41 +673,15 @@ class Connector:
for i in range(processed_height, self.last_block_height):
self.block_preload.remove(i)
processed_height = self.last_block_height
if self.block_preload._store_size < 400 * 1000000:
if self.block_preload._store_size < self.block_preload_cache_limit * 0.9:
continue
self.log.critical(str((processed_height, self.last_block_height)))
await asyncio.sleep(10)
await asyncio.sleep(1)
# remove unused items
finally:
self.block_hashes_preload_mutex = False
async def preload_block(self):
while True:
try:
start_height = self.last_block_height
height = start_height + 10
d = await self.rpc.getblockcount()
if d > height:
while True:
height += 1
d = await self.rpc.getblockhash(height)
ex = self.block_preload.get(d)
if not ex:
b = await self.rpc.getblock(d, 0)
block = decode_block_tx(b)
self.block_preload.set(d, block)
if start_height + 15000 < height:
break
except asyncio.CancelledError:
self.log.info("connector preload_block terminated")
break
except:
pass
await asyncio.sleep(15)
async def stop(self):
self.active = False