connector

This commit is contained in:
4tochka 2019-05-07 00:19:39 +04:00
parent 24ede60fbd
commit 7b21405480
9 changed files with 540 additions and 368 deletions

View File

@ -7,5 +7,5 @@ from .block import *
from .address import * from .address import *
from .wallet import * from .wallet import *
from .crypto import * from .crypto import *
from .connector import Connector from pybtc.connector import Connector

View File

@ -52,11 +52,11 @@ class Block(dict):
block_target = int.from_bytes(self["hash"], byteorder="little") block_target = int.from_bytes(self["hash"], byteorder="little")
self["difficulty"] = target_to_difficulty(block_target) self["difficulty"] = target_to_difficulty(block_target)
tx_count = var_int_to_int(read_var_int(s)) tx_count = var_int_to_int(read_var_int(s))
self["tx"] = {i: Transaction(s, format="raw", keep_raw_tx=keep_raw_tx) self["tx"] = dict()
for i in range(tx_count)} for i in range(tx_count):
for t in self["tx"].values(): self["tx"][i] = Transaction(s, format="raw", keep_raw_tx=keep_raw_tx)
self["amount"] += t["amount"] self["amount"] += self["tx"][i]["amount"]
self["strippedSize"] += t["bSize"] self["strippedSize"] += self["tx"][i]["bSize"]
self["strippedSize"] += var_int_len(tx_count) self["strippedSize"] += var_int_len(tx_count)
self["weight"] = self["strippedSize"] * 3 + self["size"] self["weight"] = self["strippedSize"] * 3 + self["size"]
if format == "decoded": if format == "decoded":

View File

@ -0,0 +1 @@
from .connector import *

View File

@ -0,0 +1,171 @@
import asyncio
import os
from multiprocessing import Process
from concurrent.futures import ThreadPoolExecutor
from setproctitle import setproctitle
import logging
import signal
import sys
class BlockLoader:
def __init__(self, parent, workers=4):
self.worker = list()
self.log = parent.log
self.loop = parent.loop
self.loop.set_default_executor(ThreadPoolExecutor(workers * 2))
[self.loop.create_task(self.start_worker(i)) for i in range(workers)]
async def start_worker(self,index):
self.log.warning('Start block loader worker %s' % index)
# prepare pipes for communications
in_reader, in_writer = os.pipe()
out_reader, out_writer = os.pipe()
in_reader, out_reader = os.fdopen(in_reader,'rb'), os.fdopen(out_reader,'rb')
in_writer, out_writer = os.fdopen(in_writer,'wb'), os.fdopen(out_writer,'wb')
# create new process
worker = Process(target=Worker, args=(index, in_reader, in_writer, out_reader, out_writer))
worker.start()
in_reader.close()
out_writer.close()
# get stream reader
worker.reader = await self.get_pipe_reader(out_reader)
worker.writer = in_writer
worker.name = str(index)
self.worker[index] = worker
# start message loop
self.loop.create_task(self.message_loop(self.worker[index]))
# wait if process crash
await self.loop.run_in_executor(None, worker.join)
del self.worker[index]
self.log.warning('Block loader worker %s is stopped' % index)
async def get_pipe_reader(self, fd_reader):
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
try:
await self.loop.connect_read_pipe(lambda: protocol, fd_reader)
except:
return None
return reader
async def pipe_get_msg(self, reader):
while True:
try:
msg = await reader.readexactly(1)
if msg == b'M':
msg = await reader.readexactly(1)
if msg == b'E':
msg = await reader.readexactly(4)
c = int.from_bytes(msg, byteorder='little')
msg = await reader.readexactly(c)
if msg:
return msg[:20].rstrip(), msg[20:]
if not msg:
return b'pipe_read_error', b''
except:
return b'pipe_read_error', b''
def pipe_sent_msg(self, writer, msg_type, msg):
msg_type = msg_type[:20].ljust(20)
msg = msg_type + msg
msg = b''.join((b'ME', len(msg).to_bytes(4, byteorder='little'), msg))
writer.write(msg)
writer.flush()
async def message_loop(self, worker):
while True:
msg_type, msg = await self.pipe_get_msg(worker.reader)
if msg_type == b'pipe_read_error':
if not worker.is_alive():
return
continue
if msg_type == b'result':
msg
continue
# def disconnect(self,ip):
# """ Disconnect peer """
# p = self.out_connection_pool[self.outgoing_connection[ip]["pool"]]
# pipe_sent_msg(p.writer, b'disconnect', ip.encode())
class Worker:
def __init__(self, name , in_reader, in_writer, out_reader, out_writer):
setproctitle('Block loader: worker %s' % name)
self.name = name
in_writer.close()
out_reader.close()
policy = asyncio.get_event_loop_policy()
policy.set_event_loop(policy.new_event_loop())
self.loop = asyncio.get_event_loop()
self.log = logging.getLogger("Block loader")
self.log.setLevel(logging.INFO)
self.loop.set_default_executor(ThreadPoolExecutor(20))
self.out_writer = out_writer
self.in_reader = in_reader
signal.signal(signal.SIGTERM, self.terminate)
self.loop.create_task(self.message_loop())
self.loop.run_forever()
def terminate(self,a,b):
sys.exit(0)
async def get_pipe_reader(self, fd_reader):
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
try:
await self.loop.connect_read_pipe(lambda: protocol, fd_reader)
except:
return None
return reader
async def pipe_get_msg(self, reader):
while True:
try:
msg = await reader.readexactly(1)
if msg == b'M':
msg = await reader.readexactly(1)
if msg == b'E':
msg = await reader.readexactly(4)
c = int.from_bytes(msg, byteorder='little')
msg = await reader.readexactly(c)
if msg:
return msg[:20].rstrip(), msg[20:]
if not msg:
return b'pipe_read_error', b''
except:
return b'pipe_read_error', b''
def pipe_sent_msg(self, writer, msg_type, msg):
msg_type = msg_type[:20].ljust(20)
msg = msg_type + msg
msg = b''.join((b'ME', len(msg).to_bytes(4, byteorder='little'), msg))
writer.write(msg)
writer.flush()
async def message_loop(self, worker):
while True:
msg_type, msg = await self.pipe_get_msg(worker.reader)
if msg_type == b'pipe_read_error':
if not worker.is_alive():
return
continue
if msg_type == b'result':
msg
continue

View File

@ -1,21 +1,18 @@
from pybtc.functions.tools import rh2s, s2rh from pybtc.functions.tools import rh2s, s2rh
from pybtc.functions.tools import var_int_to_int, var_int_len from pybtc.connector.block_loader import BlockLoader
from pybtc.functions.tools import read_var_int from pybtc.connector.utxo import UTXO
from pybtc.functions.hash import double_sha256 from pybtc.connector.utils import decode_block_tx
from pybtc.connector.utils import Cache
from pybtc.transaction import Transaction from pybtc.transaction import Transaction
from pybtc import int_to_c_int, c_int_to_int, c_int_len, int_to_bytes from pybtc import int_to_bytes
from pybtc.functions.block import bits_to_target, target_to_difficulty
from struct import unpack, pack
import sys
import traceback import traceback
import aiojsonrpc import aiojsonrpc
import zmq import zmq
import zmq.asyncio import zmq.asyncio
import asyncio import asyncio
import time import time
import io
from collections import OrderedDict
from lru import LRU
class Connector: class Connector:
def __init__(self, node_rpc_url, node_zerromq_url, logger, def __init__(self, node_rpc_url, node_zerromq_url, logger,
@ -36,6 +33,8 @@ class Connector:
# settings # settings
self.log = logger self.log = logger
self.rpc_url = node_rpc_url self.rpc_url = node_rpc_url
self.rpc_timeout = rpc_timeout
self.rpc_batch_limit = rpc_batch_limit
self.zmq_url = node_zerromq_url self.zmq_url = node_zerromq_url
self.orphan_handler = orphan_handler self.orphan_handler = orphan_handler
self.block_timeout = block_timeout self.block_timeout = block_timeout
@ -51,8 +50,7 @@ class Connector:
self.utxo_cache_size = utxo_cache_size self.utxo_cache_size = utxo_cache_size
self.utxo_data = utxo_data self.utxo_data = utxo_data
self.chain_tail = list(chain_tail) if chain_tail else [] self.chain_tail = list(chain_tail) if chain_tail else []
self.rpc_timeout = rpc_timeout
self.batch_limit = rpc_batch_limit
# state and stats # state and stats
self.node_last_block = None self.node_last_block = None
@ -151,13 +149,11 @@ class Connector:
for row in reversed(self.chain_tail): for row in reversed(self.chain_tail):
self.block_headers_cache.set(row, h) self.block_headers_cache.set(row, h)
h -= 1 h -= 1
self.block_loader = BlockLoader(self)
self.tasks.append(self.loop.create_task(self.zeromq_handler())) self.tasks.append(self.loop.create_task(self.zeromq_handler()))
self.tasks.append(self.loop.create_task(self.watchdog())) self.tasks.append(self.loop.create_task(self.watchdog()))
self.connected.set_result(True) self.connected.set_result(True)
# if self.preload:
# self.loop.create_task(self.preload_block())
# self.loop.create_task(self.preload_block_hashes())
self.get_next_block_mutex = True self.get_next_block_mutex = True
self.loop.create_task(self.get_next_block()) self.loop.create_task(self.get_next_block())
@ -303,12 +299,8 @@ class Connector:
self.deep_synchronization = False self.deep_synchronization = False
if self.deep_synchronization: if self.deep_synchronization:
raw_block = self.block_preload.pop(self.last_block_height + 1) block = self.block_preload.pop(self.last_block_height + 1)
if raw_block: if not block:
q = time.time()
block = decode_block_tx(raw_block)
self.blocks_decode_time += time.time() - q
else:
h = self.block_hashes.pop(self.last_block_height + 1) h = self.block_hashes.pop(self.last_block_height + 1)
if h is None: if h is None:
h = await self.rpc.getblockhash(self.last_block_height + 1) h = await self.rpc.getblockhash(self.last_block_height + 1)
@ -559,7 +551,7 @@ class Connector:
batch = list() batch = list()
while self.missed_tx: while self.missed_tx:
batch.append(["getrawtransaction", self.missed_tx.pop()]) batch.append(["getrawtransaction", self.missed_tx.pop()])
if len(batch) >= self.batch_limit: if len(batch) >= self.rpc_batch_limit:
break break
result = await self.rpc.batch(batch) result = await self.rpc.batch(batch)
for r in result: for r in result:
@ -665,6 +657,7 @@ class Connector:
async def preload_blocks(self): async def preload_blocks(self):
return
if self.block_hashes_preload_mutex: if self.block_hashes_preload_mutex:
return return
try: try:
@ -683,7 +676,7 @@ class Connector:
while True: while True:
batch.append(["getblockhash", height]) batch.append(["getblockhash", height])
h_list.append(height) h_list.append(height)
if len(batch) >= self.batch_limit or height >= max_height: if len(batch) >= self.rpc_batch_limit or height >= max_height:
height += 1 height += 1
break break
height += 1 height += 1
@ -698,19 +691,19 @@ class Connector:
except: except:
pass pass
blocks = await self.rpc.batch(batch) blocks = await self.block_loader.load_blocks(batch)
for x,y in zip(h,blocks): for x,y in zip(h,blocks):
try: try:
self.block_preload.set(x, (y["result"])) self.block_preload.set(x, y)
except: except:
pass pass
except asyncio.CancelledError: except asyncio.CancelledError:
self.log.info("connector preload_block_hashes failed") self.log.info("connector preload_block_hashes failed")
break break
except: except:
pass pass
if processed_height < self.last_block_height: if processed_height < self.last_block_height:
for i in range(processed_height, self.last_block_height ): for i in range(processed_height, self.last_block_height ):
try: try:
@ -749,334 +742,5 @@ class Connector:
self.log.warning('Node connector terminated') self.log.warning('Node connector terminated')
class UTXO():
def __init__(self, db_pool, loop, log, cache_size):
self.cached = LRU(cache_size)
self.missed = set()
self.destroyed = LRU(200000)
self.deleted = LRU(200000)
self.log = log
self.loaded = OrderedDict()
self.maturity = 100
self._cache_size = cache_size
self._db_pool = db_pool
self.loop = loop
self.clear_tail = False
self.last_saved_block = 0
self.last_cached_block = 0
self.save_process = False
self.load_utxo_future = asyncio.Future()
self.load_utxo_future.set_result(True)
self._requests = 0
self._failed_requests = 0
self._hit = 0
self.saved_utxo = 0
self.deleted_utxo = 0
self.deleted_utxo_saved = 0
self.loaded_utxo = 0
self.destroyed_utxo = 0
self.destroyed_utxo_block = 0
self.outs_total = 0
def set(self, outpoint, pointer, amount, address):
self.cached[outpoint] = (pointer, amount, address)
self.outs_total += 1
if pointer:
self.last_cached_block = pointer >> 42
def remove(self, outpoint):
del self.cached[outpoint]
def destroy_utxo(self, block_height):
return
block_height -= self.maturity
for key in range(self.destroyed_utxo_block + 1, block_height + 1):
if key not in self.destroyed: continue
n = set()
for outpoint in self.destroyed[key]:
try:
self.cached.pop(outpoint)
self.destroyed_utxo += 1
except:
try:
del self.loaded[outpoint]
self.destroyed_utxo += 1
n.add(outpoint)
except:
self.destroyed_utxo += 1
pass
self.deleted[key] = n
self.destroyed.pop(key)
self.destroyed_utxo_block = block_height
if len(self.cached) - self._cache_size > 0 and not self.save_process:
self.loop.create_task(self.save_utxo(block_height))
async def save_utxo(self, block_height):
# save to db tail from cache
self.save_process = True
await asyncio.sleep(2)
c = len(self.cached) - self._cache_size
try:
lb = 0
for key in iter(self.cached):
i = self.cached[key]
if c>0 and (i[0] >> 42) <= block_height:
c -= 1
lb = i[0] >> 42
continue
break
if lb:
d = set()
for key in range(self.last_saved_block + 1, lb + 1):
try:
[d.add(i) for i in self.deleted[key]]
except:
pass
a = set()
for key in iter(self.cached):
i = self.cached[key]
if (i[0] >> 42) > lb: break
a.add((key,b"".join((int_to_c_int(i[0]),
int_to_c_int(i[1]),
i[2]))))
# insert to db
async with self._db_pool.acquire() as conn:
async with conn.transaction():
if d:
await conn.execute("DELETE FROM connector_utxo WHERE "
"outpoint = ANY($1);", d)
if a:
await conn.copy_records_to_table('connector_utxo',
columns=["outpoint", "data"], records=a)
await conn.execute("UPDATE connector_utxo_state SET value = $1 "
"WHERE name = 'last_block';", lb)
await conn.execute("UPDATE connector_utxo_state SET value = $1 "
"WHERE name = 'last_cached_block';", block_height)
self.saved_utxo += len(a)
self.deleted_utxo += len(d)
# remove from cache
for key in a:
try:
self.cached.pop(key[0])
except:
pass
for key in range(self.last_saved_block + 1, lb + 1):
try:
self.deleted.pop(key)
except:
pass
self.last_saved_block = lb
finally:
self.save_process = False
def get(self, key, block_height):
self._requests += 1
try:
i = self.cached.get(key)
del self.cached[key]
self.destroyed_utxo += 1
# try:
# self.destroyed[block_height].add(key)
# except:
# self.destroyed[block_height] = {key}
self._hit += 1
return i
except:
self._failed_requests += 1
self.missed.add(key)
return None
def get_loaded(self, key, block_height):
try:
i = self.loaded[key]
try:
self.destroyed[block_height].add(key)
except:
self.destroyed[block_height] = {key}
return i
except:
return None
async def load_utxo(self):
while True:
if not self.load_utxo_future.done():
await self.load_utxo_future
continue
break
try:
self.load_utxo_future = asyncio.Future()
l = set(self.missed)
async with self._db_pool.acquire() as conn:
rows = await conn.fetch("SELECT outpoint, connector_utxo.data "
"FROM connector_utxo "
"WHERE outpoint = ANY($1);", l)
for i in l:
try:
self.missed.remove(i)
except:
pass
for row in rows:
d = row["data"]
pointer = c_int_to_int(d)
f = c_int_len(pointer)
amount = c_int_to_int(d[f:])
f += c_int_len(amount)
address = d[f:]
self.loaded[row["outpoint"]] = (pointer, amount, address)
self.loaded_utxo += 1
finally:
self.load_utxo_future.set_result(True)
def len(self):
return len(self.cached)
def hit_rate(self):
if self._requests:
return self._hit / self._requests
else:
return 0
def get_stream(stream):
if not isinstance(stream, io.BytesIO):
if isinstance(stream, str):
stream = bytes.fromhex(stream)
if isinstance(stream, bytes):
stream = io.BytesIO(stream)
else:
raise TypeError("object should be bytes or HEX encoded string")
return stream
def decode_block_tx(block):
s = get_stream(block)
b = dict()
b["amount"] = 0
b["strippedSize"] = 80
b["version"] = unpack("<L", s.read(4))[0]
b["versionHex"] = pack(">L", b["version"]).hex()
b["previousBlockHash"] = rh2s(s.read(32))
b["merkleRoot"] = rh2s(s.read(32))
b["time"] = unpack("<L", s.read(4))[0]
b["bits"] = s.read(4)
b["target"] = bits_to_target(unpack("<L", b["bits"])[0])
b["targetDifficulty"] = target_to_difficulty(b["target"])
b["target"] = b["target"].to_bytes(32, byteorder="little")
b["nonce"] = unpack("<L", s.read(4))[0]
s.seek(-80, 1)
b["header"] = s.read(80).hex()
b["bits"] = rh2s(b["bits"])
b["target"] = rh2s(b["target"])
b["hash"] = double_sha256(b["header"], hex=0)
b["hash"] = rh2s(b["hash"])
b["rawTx"] = {i: Transaction(s, format="raw")
for i in range(var_int_to_int(read_var_int(s)))}
b["tx"] = [rh2s(b["rawTx"][i]["txId"]) for i in b["rawTx"] ]
b["size"] = len(block)
for t in b["rawTx"].values():
b["amount"] += t["amount"]
b["strippedSize"] += t["bSize"]
b["strippedSize"] += var_int_len(len(b["tx"]))
b["weight"] = b["strippedSize"] * 3 + b["size"]
return b
class DependsTransaction(Exception):
def __init__(self, raw_tx_hash):
self.raw_tx_hash = raw_tx_hash
class Cache():
def __init__(self, max_size=1000000, clear_tail=True):
self._store = OrderedDict()
self._store_size = 0
self._max_size = max_size
self.clear_tail = False
self.clear_tail_auto = clear_tail
self._requests = 0
self._hit = 0
def set(self, key, value):
self._check_limit()
self._store[key] = value
self._store_size += sys.getsizeof(value) + sys.getsizeof(key)
def _check_limit(self):
if self._store_size >= self._max_size:
self.clear_tail = True
if self.clear_tail and self.clear_tail_auto:
if self._store_size >= int(self._max_size * 0.75):
try:
[self.pop_last() for i in range(20)]
except:
pass
else:
self.clear_tail = False
def get(self, key):
self._requests += 1
try:
i = self._store[key]
self._hit += 1
return i
except:
return None
def pop(self, key):
self._requests += 1
try:
data = self._store.pop(key)
self._store_size -= sys.getsizeof(data) + sys.getsizeof(key)
self._hit += 1
return data
except:
return None
def remove(self, key):
try:
data = self._store.pop(key)
self._store_size -= sys.getsizeof(data) + sys.getsizeof(key)
except:
pass
def pop_last(self):
try:
i = next(reversed(self._store))
data = self._store[i]
del self._store[i]
self._store_size -= sys.getsizeof(data) + sys.getsizeof(i)
return data
except:
return None
def get_last_key(self):
try:
i = next(reversed(self._store))
return i
except:
return None
def len(self):
return len(self._store)
def hitrate(self):
if self._requests:
return self._hit / self._requests
else:
return 0
def tm(p=None):
if p is not None:
return round(time.time() - p, 4)
return time.time()

137
pybtc/connector/utils.py Normal file
View File

@ -0,0 +1,137 @@
from pybtc.functions.tools import rh2s
from pybtc.functions.tools import var_int_to_int, var_int_len
from pybtc.functions.tools import read_var_int
from pybtc.functions.hash import double_sha256
from pybtc.transaction import Transaction
from pybtc.functions.block import bits_to_target, target_to_difficulty
from struct import unpack, pack
import io
import time
import sys
from collections import OrderedDict
def get_stream(stream):
if not isinstance(stream, io.BytesIO):
if isinstance(stream, str):
stream = bytes.fromhex(stream)
if isinstance(stream, bytes):
stream = io.BytesIO(stream)
else:
raise TypeError("object should be bytes or HEX encoded string")
return stream
def decode_block_tx(block):
s = get_stream(block)
b = dict()
b["amount"] = 0
b["strippedSize"] = 80
b["version"] = unpack("<L", s.read(4))[0]
b["versionHex"] = pack(">L", b["version"]).hex()
b["previousBlockHash"] = rh2s(s.read(32))
b["merkleRoot"] = rh2s(s.read(32))
b["time"] = unpack("<L", s.read(4))[0]
b["bits"] = s.read(4)
b["target"] = bits_to_target(unpack("<L", b["bits"])[0])
b["targetDifficulty"] = target_to_difficulty(b["target"])
b["target"] = b["target"].to_bytes(32, byteorder="little")
b["nonce"] = unpack("<L", s.read(4))[0]
s.seek(-80, 1)
b["header"] = s.read(80).hex()
b["bits"] = rh2s(b["bits"])
b["target"] = rh2s(b["target"])
b["hash"] = double_sha256(b["header"], hex=0)
b["hash"] = rh2s(b["hash"])
b["rawTx"] = dict()
b["tx"] = list()
for i in range(var_int_to_int(read_var_int(s))):
b["rawTx"][i] = Transaction(s, format="raw")
b["tx"].append(rh2s(b["rawTx"][i]["txId"]))
b["amount"] += b["rawTx"][i]["amount"]
b["strippedSize"] += b["rawTx"][i]["bSize"]
b["strippedSize"] += var_int_len(len(b["tx"]))
b["weight"] = b["strippedSize"] * 3 + b["size"]
return b
class Cache():
def __init__(self, max_size=1000000, clear_tail=True):
self._store = OrderedDict()
self._store_size = 0
self._max_size = max_size
self.clear_tail = False
self.clear_tail_auto = clear_tail
self._requests = 0
self._hit = 0
def set(self, key, value):
self._check_limit()
self._store[key] = value
self._store_size += sys.getsizeof(value) + sys.getsizeof(key)
def _check_limit(self):
if self._store_size >= self._max_size:
self.clear_tail = True
if self.clear_tail and self.clear_tail_auto:
if self._store_size >= int(self._max_size * 0.75):
try:
[self.pop_last() for i in range(20)]
except:
pass
else:
self.clear_tail = False
def get(self, key):
self._requests += 1
try:
i = self._store[key]
self._hit += 1
return i
except:
return None
def pop(self, key):
self._requests += 1
try:
data = self._store.pop(key)
self._store_size -= sys.getsizeof(data) + sys.getsizeof(key)
self._hit += 1
return data
except:
return None
def remove(self, key):
try:
data = self._store.pop(key)
self._store_size -= sys.getsizeof(data) + sys.getsizeof(key)
except:
pass
def pop_last(self):
try:
i = next(reversed(self._store))
data = self._store[i]
del self._store[i]
self._store_size -= sys.getsizeof(data) + sys.getsizeof(i)
return data
except:
return None
def get_last_key(self):
try:
i = next(reversed(self._store))
return i
except:
return None
def len(self):
return len(self._store)
def hitrate(self):
if self._requests:
return self._hit / self._requests
else:
return 0

200
pybtc/connector/utxo.py Normal file
View File

@ -0,0 +1,200 @@
from pybtc import int_to_c_int, c_int_to_int, c_int_len
import asyncio
from collections import OrderedDict
from lru import LRU
class UTXO():
def __init__(self, db_pool, loop, log, cache_size):
self.cached = LRU(cache_size)
self.missed = set()
self.destroyed = LRU(200000)
self.deleted = LRU(200000)
self.log = log
self.loaded = OrderedDict()
self.maturity = 100
self._cache_size = cache_size
self._db_pool = db_pool
self.loop = loop
self.clear_tail = False
self.last_saved_block = 0
self.last_cached_block = 0
self.save_process = False
self.load_utxo_future = asyncio.Future()
self.load_utxo_future.set_result(True)
self._requests = 0
self._failed_requests = 0
self._hit = 0
self.saved_utxo = 0
self.deleted_utxo = 0
self.deleted_utxo_saved = 0
self.loaded_utxo = 0
self.destroyed_utxo = 0
self.destroyed_utxo_block = 0
self.outs_total = 0
def set(self, outpoint, pointer, amount, address):
self.cached[outpoint] = (pointer, amount, address)
self.outs_total += 1
if pointer:
self.last_cached_block = pointer >> 42
def remove(self, outpoint):
del self.cached[outpoint]
def destroy_utxo(self, block_height):
return
block_height -= self.maturity
for key in range(self.destroyed_utxo_block + 1, block_height + 1):
if key not in self.destroyed: continue
n = set()
for outpoint in self.destroyed[key]:
try:
self.cached.pop(outpoint)
self.destroyed_utxo += 1
except:
try:
del self.loaded[outpoint]
self.destroyed_utxo += 1
n.add(outpoint)
except:
self.destroyed_utxo += 1
pass
self.deleted[key] = n
self.destroyed.pop(key)
self.destroyed_utxo_block = block_height
if len(self.cached) - self._cache_size > 0 and not self.save_process:
self.loop.create_task(self.save_utxo(block_height))
async def save_utxo(self, block_height):
# save to db tail from cache
self.save_process = True
await asyncio.sleep(2)
c = len(self.cached) - self._cache_size
try:
lb = 0
for key in iter(self.cached):
i = self.cached[key]
if c>0 and (i[0] >> 42) <= block_height:
c -= 1
lb = i[0] >> 42
continue
break
if lb:
d = set()
for key in range(self.last_saved_block + 1, lb + 1):
try:
[d.add(i) for i in self.deleted[key]]
except:
pass
a = set()
for key in iter(self.cached):
i = self.cached[key]
if (i[0] >> 42) > lb: break
a.add((key,b"".join((int_to_c_int(i[0]),
int_to_c_int(i[1]),
i[2]))))
# insert to db
async with self._db_pool.acquire() as conn:
async with conn.transaction():
if d:
await conn.execute("DELETE FROM connector_utxo WHERE "
"outpoint = ANY($1);", d)
if a:
await conn.copy_records_to_table('connector_utxo',
columns=["outpoint", "data"], records=a)
await conn.execute("UPDATE connector_utxo_state SET value = $1 "
"WHERE name = 'last_block';", lb)
await conn.execute("UPDATE connector_utxo_state SET value = $1 "
"WHERE name = 'last_cached_block';", block_height)
self.saved_utxo += len(a)
self.deleted_utxo += len(d)
# remove from cache
for key in a:
try:
self.cached.pop(key[0])
except:
pass
for key in range(self.last_saved_block + 1, lb + 1):
try:
self.deleted.pop(key)
except:
pass
self.last_saved_block = lb
finally:
self.save_process = False
def get(self, key, block_height):
self._requests += 1
try:
i = self.cached.get(key)
del self.cached[key]
self.destroyed_utxo += 1
# try:
# self.destroyed[block_height].add(key)
# except:
# self.destroyed[block_height] = {key}
self._hit += 1
return i
except:
self._failed_requests += 1
self.missed.add(key)
return None
def get_loaded(self, key, block_height):
try:
i = self.loaded[key]
try:
self.destroyed[block_height].add(key)
except:
self.destroyed[block_height] = {key}
return i
except:
return None
async def load_utxo(self):
while True:
if not self.load_utxo_future.done():
await self.load_utxo_future
continue
break
try:
self.load_utxo_future = asyncio.Future()
l = set(self.missed)
async with self._db_pool.acquire() as conn:
rows = await conn.fetch("SELECT outpoint, connector_utxo.data "
"FROM connector_utxo "
"WHERE outpoint = ANY($1);", l)
for i in l:
try:
self.missed.remove(i)
except:
pass
for row in rows:
d = row["data"]
pointer = c_int_to_int(d)
f = c_int_len(pointer)
amount = c_int_to_int(d[f:])
f += c_int_len(amount)
address = d[f:]
self.loaded[row["outpoint"]] = (pointer, amount, address)
self.loaded_utxo += 1
finally:
self.load_utxo_future.set_result(True)
def len(self):
return len(self.cached)
def hit_rate(self):
if self._requests:
return self._hit / self._requests
else:
return 0

View File

@ -613,10 +613,10 @@ class BlockDeserializeTests(unittest.TestCase):
print("decoded block load", time.time() - qt) print("decoded block load", time.time() - qt)
import cProfile import cProfile
cProfile.run("import pybtc;" # cProfile.run("import pybtc;"
"f = open('./pybtc/test/raw_block.txt');" # "f = open('./pybtc/test/raw_block.txt');"
"fc = f.readline();" # "fc = f.readline();"
"pybtc.Block(fc[:-1], format='raw', keep_raw_tx=False)") # "pybtc.Block(fc[:-1], format='raw', keep_raw_tx=False)")
# cProfile.run("import pybtc;" # cProfile.run("import pybtc;"
# "f = open('./pybtc/test/raw_block.txt');" # "f = open('./pybtc/test/raw_block.txt');"
# "fc = f.readline();" # "fc = f.readline();"

View File

@ -17,7 +17,7 @@ from pybtc.functions.script import public_key_recovery, delete_from_script
from pybtc.functions.hash import hash160, sha256, double_sha256 from pybtc.functions.hash import hash160, sha256, double_sha256
from pybtc.functions.address import hash_to_address, address_net_type, address_to_script from pybtc.functions.address import hash_to_address, address_net_type, address_to_script
from pybtc.address import PrivateKey, Address, ScriptAddress, PublicKey from pybtc.address import PrivateKey, Address, ScriptAddress, PublicKey
from collections import deque
class Transaction(dict): class Transaction(dict):
@ -65,7 +65,7 @@ class Transaction(dict):
if raw_tx is None: if raw_tx is None:
return return
self["rawTx"] = [] self["rawTx"] = deque()
rtx = self["rawTx"].append rtx = self["rawTx"].append
self["amount"] = 0 self["amount"] = 0
sw = sw_len = 0 sw = sw_len = 0
@ -73,7 +73,6 @@ class Transaction(dict):
start = stream.tell() start = stream.tell()
read = stream.read read = stream.read
tell = stream.tell tell = stream.tell
seek = stream.seek
# start deserialization # start deserialization
t = read(4) t = read(4)
rtx(t) rtx(t)