From 7b21405480b43a75e133cb5846ced97c5876490e Mon Sep 17 00:00:00 2001 From: 4tochka Date: Tue, 7 May 2019 00:19:39 +0400 Subject: [PATCH] connector --- pybtc/__init__.py | 2 +- pybtc/block.py | 10 +- pybtc/connector/__init__.py | 1 + pybtc/connector/block_loader.py | 171 +++++++++++++ pybtc/{ => connector}/connector.py | 374 ++--------------------------- pybtc/connector/utils.py | 137 +++++++++++ pybtc/connector/utxo.py | 200 +++++++++++++++ pybtc/test/block.py | 8 +- pybtc/transaction.py | 5 +- 9 files changed, 540 insertions(+), 368 deletions(-) create mode 100644 pybtc/connector/__init__.py create mode 100644 pybtc/connector/block_loader.py rename pybtc/{ => connector}/connector.py (73%) create mode 100644 pybtc/connector/utils.py create mode 100644 pybtc/connector/utxo.py diff --git a/pybtc/__init__.py b/pybtc/__init__.py index 444eebf..c43aa70 100644 --- a/pybtc/__init__.py +++ b/pybtc/__init__.py @@ -7,5 +7,5 @@ from .block import * from .address import * from .wallet import * from .crypto import * -from .connector import Connector +from pybtc.connector import Connector diff --git a/pybtc/block.py b/pybtc/block.py index 170fe52..135833e 100644 --- a/pybtc/block.py +++ b/pybtc/block.py @@ -52,11 +52,11 @@ class Block(dict): block_target = int.from_bytes(self["hash"], byteorder="little") self["difficulty"] = target_to_difficulty(block_target) tx_count = var_int_to_int(read_var_int(s)) - self["tx"] = {i: Transaction(s, format="raw", keep_raw_tx=keep_raw_tx) - for i in range(tx_count)} - for t in self["tx"].values(): - self["amount"] += t["amount"] - self["strippedSize"] += t["bSize"] + self["tx"] = dict() + for i in range(tx_count): + self["tx"][i] = Transaction(s, format="raw", keep_raw_tx=keep_raw_tx) + self["amount"] += self["tx"][i]["amount"] + self["strippedSize"] += self["tx"][i]["bSize"] self["strippedSize"] += var_int_len(tx_count) self["weight"] = self["strippedSize"] * 3 + self["size"] if format == "decoded": diff --git a/pybtc/connector/__init__.py b/pybtc/connector/__init__.py new file mode 100644 index 0000000..c5f92bc --- /dev/null +++ b/pybtc/connector/__init__.py @@ -0,0 +1 @@ +from .connector import * \ No newline at end of file diff --git a/pybtc/connector/block_loader.py b/pybtc/connector/block_loader.py new file mode 100644 index 0000000..7194a14 --- /dev/null +++ b/pybtc/connector/block_loader.py @@ -0,0 +1,171 @@ +import asyncio +import os +from multiprocessing import Process +from concurrent.futures import ThreadPoolExecutor +from setproctitle import setproctitle +import logging +import signal +import sys + +class BlockLoader: + def __init__(self, parent, workers=4): + self.worker = list() + self.log = parent.log + self.loop = parent.loop + self.loop.set_default_executor(ThreadPoolExecutor(workers * 2)) + [self.loop.create_task(self.start_worker(i)) for i in range(workers)] + + async def start_worker(self,index): + self.log.warning('Start block loader worker %s' % index) + # prepare pipes for communications + in_reader, in_writer = os.pipe() + out_reader, out_writer = os.pipe() + in_reader, out_reader = os.fdopen(in_reader,'rb'), os.fdopen(out_reader,'rb') + in_writer, out_writer = os.fdopen(in_writer,'wb'), os.fdopen(out_writer,'wb') + + # create new process + worker = Process(target=Worker, args=(index, in_reader, in_writer, out_reader, out_writer)) + worker.start() + in_reader.close() + out_writer.close() + # get stream reader + worker.reader = await self.get_pipe_reader(out_reader) + worker.writer = in_writer + worker.name = str(index) + self.worker[index] = worker + # start message loop + self.loop.create_task(self.message_loop(self.worker[index])) + # wait if process crash + await self.loop.run_in_executor(None, worker.join) + del self.worker[index] + self.log.warning('Block loader worker %s is stopped' % index) + + + async def get_pipe_reader(self, fd_reader): + reader = asyncio.StreamReader() + protocol = asyncio.StreamReaderProtocol(reader) + try: + await self.loop.connect_read_pipe(lambda: protocol, fd_reader) + except: + return None + return reader + + async def pipe_get_msg(self, reader): + while True: + try: + msg = await reader.readexactly(1) + if msg == b'M': + msg = await reader.readexactly(1) + if msg == b'E': + msg = await reader.readexactly(4) + c = int.from_bytes(msg, byteorder='little') + msg = await reader.readexactly(c) + if msg: + return msg[:20].rstrip(), msg[20:] + if not msg: + return b'pipe_read_error', b'' + except: + return b'pipe_read_error', b'' + + def pipe_sent_msg(self, writer, msg_type, msg): + msg_type = msg_type[:20].ljust(20) + msg = msg_type + msg + msg = b''.join((b'ME', len(msg).to_bytes(4, byteorder='little'), msg)) + writer.write(msg) + writer.flush() + + + + async def message_loop(self, worker): + while True: + msg_type, msg = await self.pipe_get_msg(worker.reader) + if msg_type == b'pipe_read_error': + if not worker.is_alive(): + return + continue + + if msg_type == b'result': + msg + continue + + + # def disconnect(self,ip): + # """ Disconnect peer """ + # p = self.out_connection_pool[self.outgoing_connection[ip]["pool"]] + # pipe_sent_msg(p.writer, b'disconnect', ip.encode()) + + + + + +class Worker: + + def __init__(self, name , in_reader, in_writer, out_reader, out_writer): + setproctitle('Block loader: worker %s' % name) + self.name = name + in_writer.close() + out_reader.close() + policy = asyncio.get_event_loop_policy() + policy.set_event_loop(policy.new_event_loop()) + self.loop = asyncio.get_event_loop() + self.log = logging.getLogger("Block loader") + self.log.setLevel(logging.INFO) + self.loop.set_default_executor(ThreadPoolExecutor(20)) + self.out_writer = out_writer + self.in_reader = in_reader + signal.signal(signal.SIGTERM, self.terminate) + self.loop.create_task(self.message_loop()) + self.loop.run_forever() + + + def terminate(self,a,b): + sys.exit(0) + + async def get_pipe_reader(self, fd_reader): + reader = asyncio.StreamReader() + protocol = asyncio.StreamReaderProtocol(reader) + try: + await self.loop.connect_read_pipe(lambda: protocol, fd_reader) + except: + return None + return reader + + async def pipe_get_msg(self, reader): + while True: + try: + msg = await reader.readexactly(1) + if msg == b'M': + msg = await reader.readexactly(1) + if msg == b'E': + msg = await reader.readexactly(4) + c = int.from_bytes(msg, byteorder='little') + msg = await reader.readexactly(c) + if msg: + return msg[:20].rstrip(), msg[20:] + if not msg: + return b'pipe_read_error', b'' + except: + return b'pipe_read_error', b'' + + def pipe_sent_msg(self, writer, msg_type, msg): + msg_type = msg_type[:20].ljust(20) + msg = msg_type + msg + msg = b''.join((b'ME', len(msg).to_bytes(4, byteorder='little'), msg)) + writer.write(msg) + writer.flush() + + + + async def message_loop(self, worker): + while True: + msg_type, msg = await self.pipe_get_msg(worker.reader) + if msg_type == b'pipe_read_error': + if not worker.is_alive(): + return + continue + + if msg_type == b'result': + msg + continue + + diff --git a/pybtc/connector.py b/pybtc/connector/connector.py similarity index 73% rename from pybtc/connector.py rename to pybtc/connector/connector.py index 04b183b..73bb4a3 100644 --- a/pybtc/connector.py +++ b/pybtc/connector/connector.py @@ -1,21 +1,18 @@ from pybtc.functions.tools import rh2s, s2rh -from pybtc.functions.tools import var_int_to_int, var_int_len -from pybtc.functions.tools import read_var_int -from pybtc.functions.hash import double_sha256 +from pybtc.connector.block_loader import BlockLoader +from pybtc.connector.utxo import UTXO +from pybtc.connector.utils import decode_block_tx +from pybtc.connector.utils import Cache from pybtc.transaction import Transaction -from pybtc import int_to_c_int, c_int_to_int, c_int_len, int_to_bytes -from pybtc.functions.block import bits_to_target, target_to_difficulty -from struct import unpack, pack -import sys +from pybtc import int_to_bytes + import traceback import aiojsonrpc import zmq import zmq.asyncio import asyncio import time -import io -from collections import OrderedDict -from lru import LRU + class Connector: def __init__(self, node_rpc_url, node_zerromq_url, logger, @@ -36,6 +33,8 @@ class Connector: # settings self.log = logger self.rpc_url = node_rpc_url + self.rpc_timeout = rpc_timeout + self.rpc_batch_limit = rpc_batch_limit self.zmq_url = node_zerromq_url self.orphan_handler = orphan_handler self.block_timeout = block_timeout @@ -51,8 +50,7 @@ class Connector: self.utxo_cache_size = utxo_cache_size self.utxo_data = utxo_data self.chain_tail = list(chain_tail) if chain_tail else [] - self.rpc_timeout = rpc_timeout - self.batch_limit = rpc_batch_limit + # state and stats self.node_last_block = None @@ -151,13 +149,11 @@ class Connector: for row in reversed(self.chain_tail): self.block_headers_cache.set(row, h) h -= 1 + self.block_loader = BlockLoader(self) self.tasks.append(self.loop.create_task(self.zeromq_handler())) self.tasks.append(self.loop.create_task(self.watchdog())) self.connected.set_result(True) - # if self.preload: - # self.loop.create_task(self.preload_block()) - # self.loop.create_task(self.preload_block_hashes()) self.get_next_block_mutex = True self.loop.create_task(self.get_next_block()) @@ -303,12 +299,8 @@ class Connector: self.deep_synchronization = False if self.deep_synchronization: - raw_block = self.block_preload.pop(self.last_block_height + 1) - if raw_block: - q = time.time() - block = decode_block_tx(raw_block) - self.blocks_decode_time += time.time() - q - else: + block = self.block_preload.pop(self.last_block_height + 1) + if not block: h = self.block_hashes.pop(self.last_block_height + 1) if h is None: h = await self.rpc.getblockhash(self.last_block_height + 1) @@ -559,7 +551,7 @@ class Connector: batch = list() while self.missed_tx: batch.append(["getrawtransaction", self.missed_tx.pop()]) - if len(batch) >= self.batch_limit: + if len(batch) >= self.rpc_batch_limit: break result = await self.rpc.batch(batch) for r in result: @@ -665,6 +657,7 @@ class Connector: async def preload_blocks(self): + return if self.block_hashes_preload_mutex: return try: @@ -683,7 +676,7 @@ class Connector: while True: batch.append(["getblockhash", height]) h_list.append(height) - if len(batch) >= self.batch_limit or height >= max_height: + if len(batch) >= self.rpc_batch_limit or height >= max_height: height += 1 break height += 1 @@ -698,19 +691,19 @@ class Connector: except: pass - blocks = await self.rpc.batch(batch) + blocks = await self.block_loader.load_blocks(batch) for x,y in zip(h,blocks): try: - self.block_preload.set(x, (y["result"])) + self.block_preload.set(x, y) except: pass - except asyncio.CancelledError: self.log.info("connector preload_block_hashes failed") break except: pass + if processed_height < self.last_block_height: for i in range(processed_height, self.last_block_height ): try: @@ -749,334 +742,5 @@ class Connector: self.log.warning('Node connector terminated') -class UTXO(): - def __init__(self, db_pool, loop, log, cache_size): - self.cached = LRU(cache_size) - self.missed = set() - self.destroyed = LRU(200000) - self.deleted = LRU(200000) - self.log = log - self.loaded = OrderedDict() - self.maturity = 100 - self._cache_size = cache_size - self._db_pool = db_pool - self.loop = loop - self.clear_tail = False - self.last_saved_block = 0 - self.last_cached_block = 0 - self.save_process = False - self.load_utxo_future = asyncio.Future() - self.load_utxo_future.set_result(True) - self._requests = 0 - self._failed_requests = 0 - self._hit = 0 - self.saved_utxo = 0 - self.deleted_utxo = 0 - self.deleted_utxo_saved = 0 - self.loaded_utxo = 0 - self.destroyed_utxo = 0 - self.destroyed_utxo_block = 0 - self.outs_total = 0 - - def set(self, outpoint, pointer, amount, address): - self.cached[outpoint] = (pointer, amount, address) - self.outs_total += 1 - if pointer: - self.last_cached_block = pointer >> 42 - - def remove(self, outpoint): - del self.cached[outpoint] - - def destroy_utxo(self, block_height): - return - block_height -= self.maturity - for key in range(self.destroyed_utxo_block + 1, block_height + 1): - if key not in self.destroyed: continue - n = set() - for outpoint in self.destroyed[key]: - try: - self.cached.pop(outpoint) - self.destroyed_utxo += 1 - except: - try: - del self.loaded[outpoint] - self.destroyed_utxo += 1 - n.add(outpoint) - except: - self.destroyed_utxo += 1 - pass - self.deleted[key] = n - self.destroyed.pop(key) - - self.destroyed_utxo_block = block_height - if len(self.cached) - self._cache_size > 0 and not self.save_process: - self.loop.create_task(self.save_utxo(block_height)) - - async def save_utxo(self, block_height): - # save to db tail from cache - self.save_process = True - await asyncio.sleep(2) - c = len(self.cached) - self._cache_size - try: - lb = 0 - for key in iter(self.cached): - i = self.cached[key] - if c>0 and (i[0] >> 42) <= block_height: - c -= 1 - lb = i[0] >> 42 - continue - break - - if lb: - d = set() - for key in range(self.last_saved_block + 1, lb + 1): - try: - [d.add(i) for i in self.deleted[key]] - except: - pass - - a = set() - for key in iter(self.cached): - i = self.cached[key] - if (i[0] >> 42) > lb: break - a.add((key,b"".join((int_to_c_int(i[0]), - int_to_c_int(i[1]), - i[2])))) - - # insert to db - async with self._db_pool.acquire() as conn: - async with conn.transaction(): - if d: - await conn.execute("DELETE FROM connector_utxo WHERE " - "outpoint = ANY($1);", d) - if a: - await conn.copy_records_to_table('connector_utxo', - columns=["outpoint", "data"], records=a) - await conn.execute("UPDATE connector_utxo_state SET value = $1 " - "WHERE name = 'last_block';", lb) - await conn.execute("UPDATE connector_utxo_state SET value = $1 " - "WHERE name = 'last_cached_block';", block_height) - self.saved_utxo += len(a) - self.deleted_utxo += len(d) - - # remove from cache - for key in a: - try: - self.cached.pop(key[0]) - except: - pass - - for key in range(self.last_saved_block + 1, lb + 1): - try: - self.deleted.pop(key) - except: - pass - self.last_saved_block = lb - finally: - self.save_process = False - - def get(self, key, block_height): - self._requests += 1 - try: - i = self.cached.get(key) - del self.cached[key] - self.destroyed_utxo += 1 - # try: - # self.destroyed[block_height].add(key) - # except: - # self.destroyed[block_height] = {key} - self._hit += 1 - return i - except: - self._failed_requests += 1 - self.missed.add(key) - return None - - def get_loaded(self, key, block_height): - try: - i = self.loaded[key] - try: - self.destroyed[block_height].add(key) - except: - self.destroyed[block_height] = {key} - return i - except: - return None - - async def load_utxo(self): - while True: - if not self.load_utxo_future.done(): - await self.load_utxo_future - continue - break - try: - self.load_utxo_future = asyncio.Future() - l = set(self.missed) - async with self._db_pool.acquire() as conn: - rows = await conn.fetch("SELECT outpoint, connector_utxo.data " - "FROM connector_utxo " - "WHERE outpoint = ANY($1);", l) - for i in l: - try: - self.missed.remove(i) - except: - pass - for row in rows: - d = row["data"] - pointer = c_int_to_int(d) - f = c_int_len(pointer) - amount = c_int_to_int(d[f:]) - f += c_int_len(amount) - address = d[f:] - self.loaded[row["outpoint"]] = (pointer, amount, address) - self.loaded_utxo += 1 - finally: - self.load_utxo_future.set_result(True) - def len(self): - return len(self.cached) - - def hit_rate(self): - if self._requests: - return self._hit / self._requests - else: - return 0 - - - -def get_stream(stream): - if not isinstance(stream, io.BytesIO): - if isinstance(stream, str): - stream = bytes.fromhex(stream) - if isinstance(stream, bytes): - stream = io.BytesIO(stream) - else: - raise TypeError("object should be bytes or HEX encoded string") - return stream - - -def decode_block_tx(block): - s = get_stream(block) - b = dict() - b["amount"] = 0 - b["strippedSize"] = 80 - b["version"] = unpack("L", b["version"]).hex() - b["previousBlockHash"] = rh2s(s.read(32)) - b["merkleRoot"] = rh2s(s.read(32)) - b["time"] = unpack("= self._max_size: - self.clear_tail = True - if self.clear_tail and self.clear_tail_auto: - if self._store_size >= int(self._max_size * 0.75): - try: - [self.pop_last() for i in range(20)] - except: - pass - else: - self.clear_tail = False - - def get(self, key): - self._requests += 1 - try: - i = self._store[key] - self._hit += 1 - return i - except: - return None - - def pop(self, key): - self._requests += 1 - try: - data = self._store.pop(key) - self._store_size -= sys.getsizeof(data) + sys.getsizeof(key) - self._hit += 1 - return data - except: - return None - - def remove(self, key): - try: - data = self._store.pop(key) - self._store_size -= sys.getsizeof(data) + sys.getsizeof(key) - except: - pass - - def pop_last(self): - try: - i = next(reversed(self._store)) - data = self._store[i] - del self._store[i] - self._store_size -= sys.getsizeof(data) + sys.getsizeof(i) - return data - except: - return None - - def get_last_key(self): - try: - i = next(reversed(self._store)) - return i - except: - return None - - def len(self): - return len(self._store) - - def hitrate(self): - if self._requests: - return self._hit / self._requests - else: - return 0 - - -def tm(p=None): - if p is not None: - return round(time.time() - p, 4) - return time.time() - diff --git a/pybtc/connector/utils.py b/pybtc/connector/utils.py new file mode 100644 index 0000000..a857ab3 --- /dev/null +++ b/pybtc/connector/utils.py @@ -0,0 +1,137 @@ +from pybtc.functions.tools import rh2s +from pybtc.functions.tools import var_int_to_int, var_int_len +from pybtc.functions.tools import read_var_int +from pybtc.functions.hash import double_sha256 +from pybtc.transaction import Transaction +from pybtc.functions.block import bits_to_target, target_to_difficulty +from struct import unpack, pack +import io +import time +import sys +from collections import OrderedDict + + +def get_stream(stream): + if not isinstance(stream, io.BytesIO): + if isinstance(stream, str): + stream = bytes.fromhex(stream) + if isinstance(stream, bytes): + stream = io.BytesIO(stream) + else: + raise TypeError("object should be bytes or HEX encoded string") + return stream + + +def decode_block_tx(block): + s = get_stream(block) + b = dict() + b["amount"] = 0 + b["strippedSize"] = 80 + b["version"] = unpack("L", b["version"]).hex() + b["previousBlockHash"] = rh2s(s.read(32)) + b["merkleRoot"] = rh2s(s.read(32)) + b["time"] = unpack("= self._max_size: + self.clear_tail = True + if self.clear_tail and self.clear_tail_auto: + if self._store_size >= int(self._max_size * 0.75): + try: + [self.pop_last() for i in range(20)] + except: + pass + else: + self.clear_tail = False + + def get(self, key): + self._requests += 1 + try: + i = self._store[key] + self._hit += 1 + return i + except: + return None + + def pop(self, key): + self._requests += 1 + try: + data = self._store.pop(key) + self._store_size -= sys.getsizeof(data) + sys.getsizeof(key) + self._hit += 1 + return data + except: + return None + + def remove(self, key): + try: + data = self._store.pop(key) + self._store_size -= sys.getsizeof(data) + sys.getsizeof(key) + except: + pass + + def pop_last(self): + try: + i = next(reversed(self._store)) + data = self._store[i] + del self._store[i] + self._store_size -= sys.getsizeof(data) + sys.getsizeof(i) + return data + except: + return None + + def get_last_key(self): + try: + i = next(reversed(self._store)) + return i + except: + return None + + def len(self): + return len(self._store) + + def hitrate(self): + if self._requests: + return self._hit / self._requests + else: + return 0 + + diff --git a/pybtc/connector/utxo.py b/pybtc/connector/utxo.py new file mode 100644 index 0000000..29294ff --- /dev/null +++ b/pybtc/connector/utxo.py @@ -0,0 +1,200 @@ +from pybtc import int_to_c_int, c_int_to_int, c_int_len +import asyncio +from collections import OrderedDict +from lru import LRU + + +class UTXO(): + def __init__(self, db_pool, loop, log, cache_size): + self.cached = LRU(cache_size) + self.missed = set() + self.destroyed = LRU(200000) + self.deleted = LRU(200000) + self.log = log + self.loaded = OrderedDict() + self.maturity = 100 + self._cache_size = cache_size + self._db_pool = db_pool + self.loop = loop + self.clear_tail = False + self.last_saved_block = 0 + self.last_cached_block = 0 + self.save_process = False + self.load_utxo_future = asyncio.Future() + self.load_utxo_future.set_result(True) + self._requests = 0 + self._failed_requests = 0 + self._hit = 0 + self.saved_utxo = 0 + self.deleted_utxo = 0 + self.deleted_utxo_saved = 0 + self.loaded_utxo = 0 + self.destroyed_utxo = 0 + self.destroyed_utxo_block = 0 + self.outs_total = 0 + + def set(self, outpoint, pointer, amount, address): + self.cached[outpoint] = (pointer, amount, address) + self.outs_total += 1 + if pointer: + self.last_cached_block = pointer >> 42 + + def remove(self, outpoint): + del self.cached[outpoint] + + def destroy_utxo(self, block_height): + return + block_height -= self.maturity + for key in range(self.destroyed_utxo_block + 1, block_height + 1): + if key not in self.destroyed: continue + n = set() + for outpoint in self.destroyed[key]: + try: + self.cached.pop(outpoint) + self.destroyed_utxo += 1 + except: + try: + del self.loaded[outpoint] + self.destroyed_utxo += 1 + n.add(outpoint) + except: + self.destroyed_utxo += 1 + pass + self.deleted[key] = n + self.destroyed.pop(key) + + self.destroyed_utxo_block = block_height + if len(self.cached) - self._cache_size > 0 and not self.save_process: + self.loop.create_task(self.save_utxo(block_height)) + + async def save_utxo(self, block_height): + # save to db tail from cache + self.save_process = True + await asyncio.sleep(2) + c = len(self.cached) - self._cache_size + try: + lb = 0 + for key in iter(self.cached): + i = self.cached[key] + if c>0 and (i[0] >> 42) <= block_height: + c -= 1 + lb = i[0] >> 42 + continue + break + + if lb: + d = set() + for key in range(self.last_saved_block + 1, lb + 1): + try: + [d.add(i) for i in self.deleted[key]] + except: + pass + + a = set() + for key in iter(self.cached): + i = self.cached[key] + if (i[0] >> 42) > lb: break + a.add((key,b"".join((int_to_c_int(i[0]), + int_to_c_int(i[1]), + i[2])))) + + # insert to db + async with self._db_pool.acquire() as conn: + async with conn.transaction(): + if d: + await conn.execute("DELETE FROM connector_utxo WHERE " + "outpoint = ANY($1);", d) + if a: + await conn.copy_records_to_table('connector_utxo', + columns=["outpoint", "data"], records=a) + await conn.execute("UPDATE connector_utxo_state SET value = $1 " + "WHERE name = 'last_block';", lb) + await conn.execute("UPDATE connector_utxo_state SET value = $1 " + "WHERE name = 'last_cached_block';", block_height) + self.saved_utxo += len(a) + self.deleted_utxo += len(d) + + # remove from cache + for key in a: + try: + self.cached.pop(key[0]) + except: + pass + + for key in range(self.last_saved_block + 1, lb + 1): + try: + self.deleted.pop(key) + except: + pass + self.last_saved_block = lb + finally: + self.save_process = False + + def get(self, key, block_height): + self._requests += 1 + try: + i = self.cached.get(key) + del self.cached[key] + self.destroyed_utxo += 1 + # try: + # self.destroyed[block_height].add(key) + # except: + # self.destroyed[block_height] = {key} + self._hit += 1 + return i + except: + self._failed_requests += 1 + self.missed.add(key) + return None + + def get_loaded(self, key, block_height): + try: + i = self.loaded[key] + try: + self.destroyed[block_height].add(key) + except: + self.destroyed[block_height] = {key} + return i + except: + return None + + async def load_utxo(self): + while True: + if not self.load_utxo_future.done(): + await self.load_utxo_future + continue + break + try: + self.load_utxo_future = asyncio.Future() + l = set(self.missed) + async with self._db_pool.acquire() as conn: + rows = await conn.fetch("SELECT outpoint, connector_utxo.data " + "FROM connector_utxo " + "WHERE outpoint = ANY($1);", l) + for i in l: + try: + self.missed.remove(i) + except: + pass + for row in rows: + d = row["data"] + pointer = c_int_to_int(d) + f = c_int_len(pointer) + amount = c_int_to_int(d[f:]) + f += c_int_len(amount) + address = d[f:] + self.loaded[row["outpoint"]] = (pointer, amount, address) + self.loaded_utxo += 1 + finally: + self.load_utxo_future.set_result(True) + + + def len(self): + return len(self.cached) + + def hit_rate(self): + if self._requests: + return self._hit / self._requests + else: + return 0 + diff --git a/pybtc/test/block.py b/pybtc/test/block.py index 04b970a..3ccecb6 100644 --- a/pybtc/test/block.py +++ b/pybtc/test/block.py @@ -613,10 +613,10 @@ class BlockDeserializeTests(unittest.TestCase): print("decoded block load", time.time() - qt) import cProfile - cProfile.run("import pybtc;" - "f = open('./pybtc/test/raw_block.txt');" - "fc = f.readline();" - "pybtc.Block(fc[:-1], format='raw', keep_raw_tx=False)") + # cProfile.run("import pybtc;" + # "f = open('./pybtc/test/raw_block.txt');" + # "fc = f.readline();" + # "pybtc.Block(fc[:-1], format='raw', keep_raw_tx=False)") # cProfile.run("import pybtc;" # "f = open('./pybtc/test/raw_block.txt');" # "fc = f.readline();" diff --git a/pybtc/transaction.py b/pybtc/transaction.py index efe8401..3e29976 100644 --- a/pybtc/transaction.py +++ b/pybtc/transaction.py @@ -17,7 +17,7 @@ from pybtc.functions.script import public_key_recovery, delete_from_script from pybtc.functions.hash import hash160, sha256, double_sha256 from pybtc.functions.address import hash_to_address, address_net_type, address_to_script from pybtc.address import PrivateKey, Address, ScriptAddress, PublicKey - +from collections import deque class Transaction(dict): @@ -65,7 +65,7 @@ class Transaction(dict): if raw_tx is None: return - self["rawTx"] = [] + self["rawTx"] = deque() rtx = self["rawTx"].append self["amount"] = 0 sw = sw_len = 0 @@ -73,7 +73,6 @@ class Transaction(dict): start = stream.tell() read = stream.read tell = stream.tell - seek = stream.seek # start deserialization t = read(4) rtx(t)