connector

This commit is contained in:
4tochka 2019-05-12 19:27:38 +04:00
parent fcc83e69da
commit 471b1cd8a4

View File

@ -19,8 +19,7 @@ class UTXO():
self.clear_tail = False
self.last_saved_block = 0
self.last_cached_block = 0
self.save_future = asyncio.Future()
self.save_future.set_result(True)
self.save_process = False
self.load_utxo_future = asyncio.Future()
self.load_utxo_future.set_result(True)
self._requests = 0
@ -60,71 +59,83 @@ class UTXO():
# save to db tail from cache
self.log.critical("save utxo>>>>")
return
if not self.save_future.done():
await self.save_future.done()
if self.save_process or not self.cached:
return
self.save_future = asyncio.Future()
while True:
c = len(self.cached) - self._cache_soft_limit - self.block_txo_max
if c <= 0: break
self.log.critical("str>>>>")
try:
lb = 0
for key in iter(self.cached):
i = self.cached[key]
if c>0 and (i[0] >> 42) <= block_height:
c -= 1
lb = i[0] >> 42
continue
break
self.save_process = True
try:
lb = 0
block_changed = False
utxo = set()
while self.cached:
i = self.cached.pop()
if lb != i[1][0] >> 42:
block_changed = True
lb = i[1][0] >> 42
if self.cached <= self.size_limit:
if block_changed:
break
utxo.add((i[0],b"".join((int_to_c_int(i[1][0]),
int_to_c_int(i[1][1]),
i[1][2]))))
if block_changed:
self.cached.append({i[0]: i[1]})
if lb:
d = set()
for key in range(self.last_saved_block + 1, lb + 1):
try:
[d.add(i) for i in self.deleted[key]]
except:
pass
#
# block_height
# for key in iter(self.cached):
# i = self.cached[key]
# if c>0 and (i[0] >> 42) <= block_height:
# c -= 1
# lb = i[0] >> 42
# continue
# break
#
# if lb:
# d = set()
# for key in range(self.last_saved_block + 1, lb + 1):
# try:
# [d.add(i) for i in self.deleted[key]]
# except:
# pass
#
# a = set()
# for key in iter(self.cached):
# i = self.cached[key]
# if (i[0] >> 42) > lb: break
# a.add((key,b"".join((int_to_c_int(i[0]),
# int_to_c_int(i[1]),
# i[2]))))
a = set()
for key in iter(self.cached):
i = self.cached[key]
if (i[0] >> 42) > lb: break
a.add((key,b"".join((int_to_c_int(i[0]),
int_to_c_int(i[1]),
i[2]))))
# insert to db
d = set()
async with self._db_pool.acquire() as conn:
async with conn.transaction():
if d:
await conn.execute("DELETE FROM connector_utxo WHERE "
"outpoint = ANY($1);", d)
if a:
await conn.copy_records_to_table('connector_utxo',
columns=["outpoint", "data"], records=utxo)
await conn.execute("UPDATE connector_utxo_state SET value = $1 "
"WHERE name = 'last_block';", lb)
self.saved_utxo += len(utxo)
self.deleted_utxo += len(d)
# insert to db
async with self._db_pool.acquire() as conn:
async with conn.transaction():
if d:
await conn.execute("DELETE FROM connector_utxo WHERE "
"outpoint = ANY($1);", d)
if a:
await conn.copy_records_to_table('connector_utxo',
columns=["outpoint", "data"], records=a)
await conn.execute("UPDATE connector_utxo_state SET value = $1 "
"WHERE name = 'last_block';", lb)
await conn.execute("UPDATE connector_utxo_state SET value = $1 "
"WHERE name = 'last_cached_block';", block_height)
self.saved_utxo += len(a)
self.deleted_utxo += len(d)
# remove from cache
for key in a:
try:
self.cached.pop(key[0])
except:
pass
for key in range(self.last_saved_block + 1, lb + 1):
try:
self.deleted.pop(key)
except:
pass
self.last_saved_block = lb
finally:
self.save_future = False
# # remove from cache
# for key in a:
# try:
# self.cached.pop(key[0])
# except:
# pass
#
# for key in range(self.last_saved_block + 1, lb + 1):
# try:
# self.deleted.pop(key)
# except:
# pass
self.last_saved_block = lb
finally:
self.save_future = False
def get(self, key):
self._requests += 1