connector

This commit is contained in:
4tochka 2019-05-19 11:32:09 +04:00
parent b5994bba4f
commit 9be276e20f
3 changed files with 44 additions and 34 deletions

View File

@ -44,13 +44,12 @@ class BlockLoader:
else:
# clear unused cache
if self.parent.block_preload._store:
self.log.warning(">>>" + str(next(iter(self.parent.block_preload._store))))
if next(iter(self.parent.block_preload._store)) <= self.parent.last_block_height:
for i in range(next(iter(self.parent.block_preload._store)),
self.parent.last_block_height + 1):
try: self.parent.block_preload.remove(i)
except: self.log.warning(str(traceback.format_exc()))
except: pass
except asyncio.CancelledError:
self.log.info("connector watchdog terminated")

View File

@ -361,8 +361,9 @@ class Connector:
self.cache_loading = True if self.last_block_height < self.app_block_height_on_start else False
if self.deep_synchronization:
tx_bin_list = [block["rawTx"][i]["txId"] for i in block["rawTx"]]
if not self.deep_synchronization:
if not self.block_batch_handler:
tx_bin_list = [block["rawTx"][i]["txId"] for i in block["rawTx"]]
else:
tx_bin_list = [s2rh(h) for h in block["tx"]]
await self.verify_block_position(block)
@ -374,27 +375,32 @@ class Connector:
else:
await self.fetch_block_transactions(block, tx_bin_list)
if self.block_handler and not self.cache_loading:
await self.block_handler(block)
self.block_headers_cache.set(block["hash"], block["height"])
self.last_block_height = block["height"]
if self.utxo_data:
checkpoint = self.utxo.checkpoint
try: self.utxo.checkpoints.append(block["checkpoint"])
except: pass
if len(self.utxo.cached) > self.utxo.size_limit and \
not self.utxo.save_process and \
self.utxo.checkpoints:
# n = list()
# for i in self.utxo.checkpoints:
# if i >= block["height"]:
# n.append(i)
# self.utxo.checkpoints = n
if self.utxo.checkpoints[0] < block["height"]:
self.utxo.deleted_last_block = block["height"]
self.utxo.pending_deleted = self.utxo.pending_deleted | self.utxo.deleted
self.utxo.deleted = set()
self.loop.create_task(self.utxo.save_utxo())
self.utxo.create_checkpoint()
else:
checkpoint = None
if self.block_batch_handler and not self.cache_loading:
await self.block_batch_handler(block, checkpoint)
if self.block_handler and not self.cache_loading:
await self.block_handler(block)
self.block_headers_cache.set(block["hash"], block["height"])
self.last_block_height = block["height"]
if self.utxo_data and self.utxo.save_process:
self.loop.create_task(self.utxo.save_utxo())
self.blocks_processed_count += 1
@ -583,9 +589,6 @@ class Connector:
self.log.critical("utxo get failed " + rh2s(block["hash"]))
raise Exception("utxo get failed ")
if self.block_batch_handler and not self.cache_loading:
await self.block_batch_handler(block)
self.total_received_tx += len(block["rawTx"])
except Exception as err:
self.log.critical("new block error %s " % err)

View File

@ -2,6 +2,7 @@ from pybtc import int_to_c_int, c_int_to_int, c_int_len
import asyncio
from collections import OrderedDict
from pybtc import MRU
import traceback
class UTXO():
def __init__(self, db_pool, loop, log, cache_size):
@ -9,6 +10,7 @@ class UTXO():
self.missed = set()
self.deleted = set()
self.pending_deleted = set()
self.pending_utxo = set()
self.checkpoints = list()
self.log = log
self.loaded = MRU()
@ -43,21 +45,19 @@ class UTXO():
del self.cached[outpoint]
async def save_utxo(self):
def create_checkpoint(self):
# save to db tail from cache
if self.save_process or not self.cached: return
if not self.checkpoints: return
self.save_process = True
try:
# self.log.critical("cached " + str(len(self.cached)) )
i = self.cached.peek_last_item()
self.checkpoints = sorted(self.checkpoints)
checkpoint = self.checkpoints.pop(0)
lb = 0
block_changed = False
checkpoint_found = False
utxo = set()
while self.cached:
i = self.cached.pop()
if lb != i[1][0] >> 42:
@ -77,19 +77,26 @@ class UTXO():
if len(self.cached) <= self.size_limit:
if block_changed and checkpoint_found:
break
utxo.add((i[0],b"".join((int_to_c_int(i[1][0]),
self.pending_utxo.add((i[0],b"".join((int_to_c_int(i[1][0]),
int_to_c_int(i[1][1]),
i[1][2]))))
self.pending_saved[i[0]] = i[1]
if block_changed:
self.cached.append({i[0]: i[1]})
lb -= 1
if not checkpoint_found:
for i in reversed(self.pending_saved):
self.cached.append({i: self.pending_saved[i]})
self.log.critical("checkpoint not found " +str(lb) +" > "+ str(self.checkpoints))
await asyncio.sleep(5)
return
self.checkpoint = lb if checkpoint_found else None
except:
self.log.critical("create checkpoint error")
self.log.critical(str(traceback.format_exc()))
async def save_checkpoint(self):
# save to db tail from cache
if not self.checkpoint: return
try:
if not self.checkpoint: return
async with self._db_pool.acquire() as conn:
@ -97,21 +104,22 @@ class UTXO():
if self.pending_deleted:
await conn.execute("DELETE FROM connector_utxo WHERE "
"outpoint = ANY($1);", self.pending_deleted)
if utxo:
if self.pending_utxo:
await conn.copy_records_to_table('connector_utxo',
columns=["outpoint", "data"], records=utxo)
columns=["outpoint", "data"], records=self.pending_utxo)
await conn.execute("UPDATE connector_utxo_state SET value = $1 "
"WHERE name = 'last_block';", lb)
await conn.execute("UPDATE connector_utxo_state SET value = $1 "
"WHERE name = 'last_cached_block';", self.deleted_last_block)
self.saved_utxo += len(utxo)
self.saved_utxo += len(self.pending_utxo)
self.deleted_utxo += len(self.pending_deleted)
self.pending_deleted = set()
self.pending_utxo = set()
self.last_saved_block = lb
self.last_saved_block = self.checkpoint
self.checkpoint = None
except:
import traceback
self.log.critical("implement rollback ")
self.log.critical(str(traceback.format_exc()))
finally: