Compare commits
10 Commits
a5e8a3f2b7
...
8101a595f9
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8101a595f9 | ||
|
|
385acb4f49 | ||
|
|
1e031f2944 | ||
|
|
9c849994a3 | ||
|
|
53a858c56b | ||
|
|
3fa666b7ef | ||
|
|
f74327dae5 | ||
|
|
9e91e4decd | ||
|
|
14d8645132 | ||
|
|
5ef4c7c462 |
@ -64,9 +64,9 @@ class BlockLoader:
|
||||
self.rpc_batch_limit = 10
|
||||
self.worker_tasks = [self.loop.create_task(self.start_worker(i)) for i in range(self.worker_limit)]
|
||||
target_height = self.parent.node_last_block - self.parent.deep_sync_limit
|
||||
height = self.parent.last_block_height + 1
|
||||
self.height = self.parent.last_block_height + 1
|
||||
|
||||
while height < target_height:
|
||||
while self.height < target_height:
|
||||
new_requests = 0
|
||||
if self.parent.block_preload._store_size < self.parent.block_preload_cache_limit:
|
||||
try:
|
||||
@ -83,12 +83,12 @@ class BlockLoader:
|
||||
for i in self.worker_busy:
|
||||
if not self.worker_busy[i]:
|
||||
self.worker_busy[i] = True
|
||||
if height <= self.parent.last_block_height:
|
||||
height = self.parent.last_block_height + 1
|
||||
if self.height <= self.parent.last_block_height:
|
||||
self.height = self.parent.last_block_height + 1
|
||||
await self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit',
|
||||
int_to_bytes(self.rpc_batch_limit))
|
||||
await self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height))
|
||||
height += self.rpc_batch_limit
|
||||
await self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(self.height))
|
||||
self.height += self.rpc_batch_limit
|
||||
new_requests += 1
|
||||
if not new_requests:
|
||||
await asyncio.sleep(1)
|
||||
@ -189,6 +189,8 @@ class BlockLoader:
|
||||
blocks = pickle.loads(msg)
|
||||
if blocks:
|
||||
self.last_batch_size = len(msg)
|
||||
else:
|
||||
self.rpc_batch_limit = 20
|
||||
for i in blocks:
|
||||
self.parent.block_preload.set(i, blocks[i])
|
||||
if blocks:
|
||||
@ -198,7 +200,9 @@ class BlockLoader:
|
||||
else:
|
||||
self.parent.utxo.checkpoints.append(i)
|
||||
|
||||
|
||||
if msg_type == b'failed':
|
||||
self.height = bytes_to_int(msg)
|
||||
continue
|
||||
# def disconnect(self,ip):
|
||||
# """ Disconnect peer """
|
||||
# p = self.out_connection_pool[self.outgoing_connection[ip]["pool"]]
|
||||
@ -324,6 +328,7 @@ class Worker:
|
||||
self.log.critical("load blocks error")
|
||||
self.log.critical(str(traceback.format_exc()))
|
||||
await self.pipe_sent_msg(b'result', pickle.dumps([]))
|
||||
await self.pipe_sent_msg(b'failed', pickle.dumps(height))
|
||||
|
||||
|
||||
async def message_loop(self):
|
||||
@ -343,6 +348,7 @@ class Worker:
|
||||
if msg_type == b'rpc_batch_limit':
|
||||
self.rpc_batch_limit = bytes_to_int(msg)
|
||||
continue
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
@ -93,6 +93,8 @@ class Connector:
|
||||
self.total_received_tx_last = 0
|
||||
self.start_time_last = time.time()
|
||||
self.batch_time = time.time()
|
||||
self.batch_load_utxo = 0
|
||||
self.batch_parsing = 0
|
||||
# cache and system
|
||||
self.block_preload_cache_limit = block_preload_cache_limit
|
||||
self.block_hashes_cache_limit = block_hashes_cache_limit
|
||||
@ -340,6 +342,8 @@ class Connector:
|
||||
if not block:
|
||||
h = await self.rpc.getblockhash(self.last_block_height + 1)
|
||||
block = await self._get_block_by_hash(h)
|
||||
block["checkpoint"] = self.last_block_height + 1
|
||||
block["height"] = self.last_block_height + 1
|
||||
|
||||
self.loop.create_task(self._new_block(block))
|
||||
except Exception as err:
|
||||
@ -440,6 +444,10 @@ class Connector:
|
||||
if self.deep_synchronization:
|
||||
self.log.debug("- Batch ---------------")
|
||||
self.log.debug(" Rate %s; transactions %s" % (tx_rate_last, batch_tx_count))
|
||||
self.log.debug(" Load utxo %s; parsing %s" % (self.batch_load_utxo,
|
||||
self.batch_parsing))
|
||||
self.batch_load_utxo = 0
|
||||
self.batch_parsing = 0
|
||||
|
||||
self.log.debug("- Blocks --------------")
|
||||
|
||||
@ -448,9 +456,10 @@ class Connector:
|
||||
"cache size %s M;" % (self.non_cached_blocks,
|
||||
self.block_preload.len(),
|
||||
round(self.block_preload._store_size / 1024 / 1024, 2)))
|
||||
self.log.debug(" Cache first block %s; "
|
||||
"cache last block %s;" % (next(iter(self.block_preload._store)),
|
||||
next(reversed(self.block_preload._store))))
|
||||
if self.block_preload._store:
|
||||
self.log.debug(" Cache first block %s; "
|
||||
"cache last block %s;" % (next(iter(self.block_preload._store)),
|
||||
next(reversed(self.block_preload._store))))
|
||||
self.log.debug(" Preload coins cache -> %s:%s [%s] "
|
||||
"preload cache efficiency %s;" % (self.preload_cached,
|
||||
self.preload_cached_annihilated,
|
||||
@ -483,6 +492,7 @@ class Connector:
|
||||
c,
|
||||
int(self.utxo.read_from_db_time_total)))
|
||||
self.utxo.read_from_db_batch_time = 0
|
||||
self.utxo.read_from_db_time = 0
|
||||
self.utxo.read_from_db_count = 0
|
||||
self.log.debug("- Coins ---------------")
|
||||
self.log.debug(" Coins %s; destroyed %s; "
|
||||
@ -565,7 +575,8 @@ class Connector:
|
||||
|
||||
async def _block_as_transactions_batch(self, block):
|
||||
try:
|
||||
|
||||
t2 = 0
|
||||
t = time.time()
|
||||
if self.utxo:
|
||||
for q in block["rawTx"]:
|
||||
tx = block["rawTx"][q]
|
||||
@ -620,7 +631,10 @@ class Connector:
|
||||
missed.add((outpoint, (block["height"] << 39) + (q << 20) + (1 << 19) + i, q, i))
|
||||
|
||||
if missed:
|
||||
t2 = time.time()
|
||||
await self.utxo.load_utxo()
|
||||
t2 =time.time() - t2
|
||||
self.batch_load_utxo += t2
|
||||
for o, s, q, i in missed:
|
||||
block["rawTx"][q]["vIn"][i]["coin"] = self.utxo.get_loaded(o)
|
||||
c += 1
|
||||
@ -631,6 +645,7 @@ class Connector:
|
||||
|
||||
self.total_received_tx += len(block["rawTx"])
|
||||
self.total_received_tx_last += len(block["rawTx"])
|
||||
self.batch_parsing += (time.time() - t) - t2
|
||||
except Exception as err:
|
||||
self.log.critical("new block error %s " % err)
|
||||
self.log.critical(str(traceback.format_exc()))
|
||||
|
||||
@ -600,18 +600,19 @@ class BlockDeserializeTests(unittest.TestCase):
|
||||
f = open('./pybtc/test/raw_block.txt')
|
||||
fc = f.readline()
|
||||
qt = time.time()
|
||||
bt = (
|
||||
Block(fc[:-1], format="raw", keep_raw_tx=False),
|
||||
)
|
||||
bt = Block(fc[:-1], format="raw", keep_raw_tx=False)
|
||||
print(len(bt["tx"]))
|
||||
print([t["txId"] for t in bt["tx"].values()])
|
||||
print(merkle_branches([t["txId"] for t in bt["tx"].values()]))
|
||||
|
||||
import ujson as pickle
|
||||
qt = time.time()
|
||||
k = pickle.dumps(bt[0]["tx"][0])
|
||||
print("decoded block dump", time.time() - qt)
|
||||
qt = time.time()
|
||||
p = pickle.loads(k)
|
||||
print("decoded block load", time.time() - qt)
|
||||
print(p[0]["hash"])
|
||||
# import pickle
|
||||
# qt = time.time()
|
||||
# k = pickle.dumps(bt[0]["tx"][0])
|
||||
# print("decoded block dump", time.time() - qt)
|
||||
# qt = time.time()
|
||||
# p = pickle.loads(k)
|
||||
# print("decoded block load", time.time() - qt)
|
||||
# print(p)
|
||||
|
||||
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user