Compare commits

...

10 Commits

Author SHA1 Message Date
4tochka
8101a595f9 connector 2019-06-03 23:00:15 +04:00
4tochka
385acb4f49 connector 2019-06-03 22:47:33 +04:00
4tochka
1e031f2944 connector 2019-06-03 22:14:14 +04:00
4tochka
9c849994a3 connector 2019-06-03 22:05:52 +04:00
4tochka
53a858c56b connector 2019-06-03 22:01:28 +04:00
4tochka
3fa666b7ef connector 2019-06-03 21:25:08 +04:00
4tochka
f74327dae5 connector 2019-06-03 21:19:31 +04:00
4tochka
9e91e4decd connector 2019-06-03 17:34:52 +04:00
4tochka
14d8645132 connector 2019-06-03 17:25:19 +04:00
4tochka
5ef4c7c462 connector 2019-06-03 15:00:18 +04:00
3 changed files with 44 additions and 22 deletions

View File

@ -64,9 +64,9 @@ class BlockLoader:
self.rpc_batch_limit = 10
self.worker_tasks = [self.loop.create_task(self.start_worker(i)) for i in range(self.worker_limit)]
target_height = self.parent.node_last_block - self.parent.deep_sync_limit
height = self.parent.last_block_height + 1
self.height = self.parent.last_block_height + 1
while height < target_height:
while self.height < target_height:
new_requests = 0
if self.parent.block_preload._store_size < self.parent.block_preload_cache_limit:
try:
@ -83,12 +83,12 @@ class BlockLoader:
for i in self.worker_busy:
if not self.worker_busy[i]:
self.worker_busy[i] = True
if height <= self.parent.last_block_height:
height = self.parent.last_block_height + 1
if self.height <= self.parent.last_block_height:
self.height = self.parent.last_block_height + 1
await self.pipe_sent_msg(self.worker[i].writer, b'rpc_batch_limit',
int_to_bytes(self.rpc_batch_limit))
await self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height))
height += self.rpc_batch_limit
await self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(self.height))
self.height += self.rpc_batch_limit
new_requests += 1
if not new_requests:
await asyncio.sleep(1)
@ -189,6 +189,8 @@ class BlockLoader:
blocks = pickle.loads(msg)
if blocks:
self.last_batch_size = len(msg)
else:
self.rpc_batch_limit = 20
for i in blocks:
self.parent.block_preload.set(i, blocks[i])
if blocks:
@ -198,7 +200,9 @@ class BlockLoader:
else:
self.parent.utxo.checkpoints.append(i)
if msg_type == b'failed':
self.height = bytes_to_int(msg)
continue
# def disconnect(self,ip):
# """ Disconnect peer """
# p = self.out_connection_pool[self.outgoing_connection[ip]["pool"]]
@ -324,6 +328,7 @@ class Worker:
self.log.critical("load blocks error")
self.log.critical(str(traceback.format_exc()))
await self.pipe_sent_msg(b'result', pickle.dumps([]))
await self.pipe_sent_msg(b'failed', pickle.dumps(height))
async def message_loop(self):
@ -343,6 +348,7 @@ class Worker:
if msg_type == b'rpc_batch_limit':
self.rpc_batch_limit = bytes_to_int(msg)
continue
except:
pass

View File

@ -93,6 +93,8 @@ class Connector:
self.total_received_tx_last = 0
self.start_time_last = time.time()
self.batch_time = time.time()
self.batch_load_utxo = 0
self.batch_parsing = 0
# cache and system
self.block_preload_cache_limit = block_preload_cache_limit
self.block_hashes_cache_limit = block_hashes_cache_limit
@ -340,6 +342,8 @@ class Connector:
if not block:
h = await self.rpc.getblockhash(self.last_block_height + 1)
block = await self._get_block_by_hash(h)
block["checkpoint"] = self.last_block_height + 1
block["height"] = self.last_block_height + 1
self.loop.create_task(self._new_block(block))
except Exception as err:
@ -440,6 +444,10 @@ class Connector:
if self.deep_synchronization:
self.log.debug("- Batch ---------------")
self.log.debug(" Rate %s; transactions %s" % (tx_rate_last, batch_tx_count))
self.log.debug(" Load utxo %s; parsing %s" % (self.batch_load_utxo,
self.batch_parsing))
self.batch_load_utxo = 0
self.batch_parsing = 0
self.log.debug("- Blocks --------------")
@ -448,9 +456,10 @@ class Connector:
"cache size %s M;" % (self.non_cached_blocks,
self.block_preload.len(),
round(self.block_preload._store_size / 1024 / 1024, 2)))
self.log.debug(" Cache first block %s; "
"cache last block %s;" % (next(iter(self.block_preload._store)),
next(reversed(self.block_preload._store))))
if self.block_preload._store:
self.log.debug(" Cache first block %s; "
"cache last block %s;" % (next(iter(self.block_preload._store)),
next(reversed(self.block_preload._store))))
self.log.debug(" Preload coins cache -> %s:%s [%s] "
"preload cache efficiency %s;" % (self.preload_cached,
self.preload_cached_annihilated,
@ -483,6 +492,7 @@ class Connector:
c,
int(self.utxo.read_from_db_time_total)))
self.utxo.read_from_db_batch_time = 0
self.utxo.read_from_db_time = 0
self.utxo.read_from_db_count = 0
self.log.debug("- Coins ---------------")
self.log.debug(" Coins %s; destroyed %s; "
@ -565,7 +575,8 @@ class Connector:
async def _block_as_transactions_batch(self, block):
try:
t2 = 0
t = time.time()
if self.utxo:
for q in block["rawTx"]:
tx = block["rawTx"][q]
@ -620,7 +631,10 @@ class Connector:
missed.add((outpoint, (block["height"] << 39) + (q << 20) + (1 << 19) + i, q, i))
if missed:
t2 = time.time()
await self.utxo.load_utxo()
t2 =time.time() - t2
self.batch_load_utxo += t2
for o, s, q, i in missed:
block["rawTx"][q]["vIn"][i]["coin"] = self.utxo.get_loaded(o)
c += 1
@ -631,6 +645,7 @@ class Connector:
self.total_received_tx += len(block["rawTx"])
self.total_received_tx_last += len(block["rawTx"])
self.batch_parsing += (time.time() - t) - t2
except Exception as err:
self.log.critical("new block error %s " % err)
self.log.critical(str(traceback.format_exc()))

View File

@ -600,18 +600,19 @@ class BlockDeserializeTests(unittest.TestCase):
f = open('./pybtc/test/raw_block.txt')
fc = f.readline()
qt = time.time()
bt = (
Block(fc[:-1], format="raw", keep_raw_tx=False),
)
bt = Block(fc[:-1], format="raw", keep_raw_tx=False)
print(len(bt["tx"]))
print([t["txId"] for t in bt["tx"].values()])
print(merkle_branches([t["txId"] for t in bt["tx"].values()]))
import ujson as pickle
qt = time.time()
k = pickle.dumps(bt[0]["tx"][0])
print("decoded block dump", time.time() - qt)
qt = time.time()
p = pickle.loads(k)
print("decoded block load", time.time() - qt)
print(p[0]["hash"])
# import pickle
# qt = time.time()
# k = pickle.dumps(bt[0]["tx"][0])
# print("decoded block dump", time.time() - qt)
# qt = time.time()
# p = pickle.loads(k)
# print("decoded block load", time.time() - qt)
# print(p)