diff --git a/build/lib.macosx-10.9-x86_64-3.7/pybtc/connector.py b/build/lib.macosx-10.9-x86_64-3.7/pybtc/connector.py deleted file mode 100644 index 0468f00..0000000 --- a/build/lib.macosx-10.9-x86_64-3.7/pybtc/connector.py +++ /dev/null @@ -1,1108 +0,0 @@ -from pybtc.functions.tools import rh2s, s2rh -from pybtc.functions.tools import var_int_to_int, var_int_len -from pybtc.functions.tools import read_var_int -from pybtc.functions.hash import double_sha256 -from pybtc.transaction import Transaction -from pybtc import int_to_c_int, c_int_to_int, c_int_len, int_to_bytes -from pybtc.functions.block import bits_to_target, target_to_difficulty -from struct import unpack, pack -import sys -import traceback -import aiojsonrpc -import zmq -import zmq.asyncio -import asyncio -import time -import io -from collections import OrderedDict, deque -from lru import LRU - -class Connector: - def __init__(self, node_rpc_url, node_zerromq_url, logger, - last_block_height=0, chain_tail=None, - tx_handler=None, orphan_handler=None, - before_block_handler=None, block_handler=None, after_block_handler=None, - block_timeout=30, - deep_sync_limit=20, backlog=0, mempool_tx=True, - rpc_batch_limit=50, rpc_threads_limit=100, rpc_timeout=100, - utxo_data=False, - utxo_cache_size=1000000, - skip_opreturn=True, - block_preload_cache_limit= 1000 * 1000000, - block_hashes_cache_limit= 200 * 1000000, - postgres_pool=None): - self.loop = asyncio.get_event_loop() - - # settings - self.log = logger - self.rpc_url = node_rpc_url - self.zmq_url = node_zerromq_url - self.orphan_handler = orphan_handler - self.block_timeout = block_timeout - self.tx_handler = tx_handler - self.skip_opreturn = skip_opreturn - self.before_block_handler = before_block_handler - self.block_handler = block_handler - self.after_block_handler = after_block_handler - self.deep_sync_limit = deep_sync_limit - self.backlog = backlog - self.mempool_tx = mempool_tx - self.postgress_pool = postgres_pool - self.utxo_cache_size = utxo_cache_size - self.utxo_data = utxo_data - self.chain_tail = list(chain_tail) if chain_tail else [] - self.rpc_timeout = rpc_timeout - self.batch_limit = rpc_batch_limit - - # state and stats - self.node_last_block = None - self.utxo = None - self.cache_loading = False - self.app_block_height_on_start = int(last_block_height) if int(last_block_height) else 0 - self.last_block_height = 0 - self.last_block_utxo_cached_height = 0 - self.checkpoints = deque() - self.deep_synchronization = False - - self.block_dependency_tx = 0 # counter of tx that have dependencies in block - self.active = True - self.get_next_block_mutex = False - self.active_block = asyncio.Future() - self.active_block.set_result(True) - self.last_zmq_msg = int(time.time()) - self.total_received_tx = 0 - self.total_received_tx_stat = 0 - self.blocks_processed_count = 0 - self.blocks_decode_time = 0 - self.blocks_download_time = 0 - self.tx_processing_time = 0 - self.non_cached_blocks = 0 - self.total_received_tx_time = 0 - self.tt = 0 - self.start_time = time.time() - - # cache and system - self.block_preload_cache_limit = block_preload_cache_limit - self.block_hashes_cache_limit = block_hashes_cache_limit - self.tx_cache_limit = 100 * 100000 - self.block_headers_cache_limit = 100 * 100000 - self.block_preload = Cache(max_size=self.block_preload_cache_limit, clear_tail=False) - self.block_hashes = Cache(max_size=self.block_hashes_cache_limit) - self.block_hashes_preload_mutex = False - self.tx_cache = Cache(max_size=self.tx_cache_limit) - self.block_headers_cache = Cache(max_size=self.block_headers_cache_limit) - - self.block_txs_request = None - - self.connected = asyncio.Future() - self.await_tx = list() - self.missed_tx = list() - self.await_tx_future = dict() - self.add_tx_future = dict() - self.get_missed_tx_threads = 0 - self.get_missed_tx_threads_limit = rpc_threads_limit - self.tx_in_process = set() - self.zmqContext = None - self.tasks = list() - self.log.info("Node connector started") - asyncio.ensure_future(self.start(), loop=self.loop) - - async def start(self): - await self.utxo_init() - - while True: - self.log.info("Connector initialization") - try: - self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout) - self.node_last_block = await self.rpc.getblockcount() - except Exception as err: - self.log.error("Get node best block error:" + str(err)) - if not isinstance(self.node_last_block, int): - self.log.error("Get node best block height failed") - self.log.error("Node rpc url: "+self.rpc_url) - await asyncio.sleep(10) - continue - - self.log.info("Node best block height %s" %self.node_last_block) - self.log.info("Connector last block height %s" % self.last_block_height) - - if self.node_last_block < self.last_block_height: - self.log.error("Node is behind application blockchain state!") - await asyncio.sleep(10) - continue - elif self.node_last_block == self.last_block_height: - self.log.warning("Blockchain is synchronized") - else: - d = self.node_last_block - self.last_block_height - self.log.warning("%s blocks before synchronization" % d) - if d > self.deep_sync_limit: - self.log.warning("Deep synchronization mode") - self.deep_synchronization = True - break - - if self.utxo_data: - self.utxo = UTXO(self.postgress_pool, - self.loop, - self.log, - self.utxo_cache_size if self.deep_synchronization else 0) - - h = self.last_block_height - if h < len(self.chain_tail): - raise Exception("Chain tail len not match last block height") - for row in reversed(self.chain_tail): - self.block_headers_cache.set(row, h) - h -= 1 - - self.tasks.append(self.loop.create_task(self.zeromq_handler())) - self.tasks.append(self.loop.create_task(self.watchdog())) - self.connected.set_result(True) - # if self.preload: - # self.loop.create_task(self.preload_block()) - # self.loop.create_task(self.preload_block_hashes()) - self.get_next_block_mutex = True - self.loop.create_task(self.get_next_block()) - - async def utxo_init(self): - if self.utxo_data: - if self.postgress_pool is None: - raise Exception("UTXO data required postgresql db connection pool") - - async with self.postgress_pool.acquire() as conn: - await conn.execute("""CREATE TABLE IF NOT EXISTS - connector_utxo (outpoint BYTEA, - data BYTEA, - PRIMARY KEY(outpoint)); - """) - await conn.execute("""CREATE TABLE IF NOT EXISTS - connector_utxo_state (name VARCHAR, - value BIGINT, - PRIMARY KEY(name)); - """) - lb = await conn.fetchval("SELECT value FROM connector_utxo_state WHERE name='last_block';") - lc = await conn.fetchval("SELECT value FROM connector_utxo_state WHERE name='last_cached_block';") - if lb is None: - lb = 0 - lc = 0 - await conn.execute("INSERT INTO connector_utxo_state (name, value) " - "VALUES ('last_block', 0);") - await conn.execute("INSERT INTO connector_utxo_state (name, value) " - "VALUES ('last_cached_block', 0);") - self.last_block_height = lb - self.last_block_utxo_cached_height = lc - if self.app_block_height_on_start: - if self.app_block_height_on_start < self.last_block_utxo_cached_height: - self.log.critical("UTXO state last block %s app state last block %s " % (self.last_block_height, - self.last_block_utxo_cached_height)) - raise Exception("App blockchain state behind connector blockchain state") - if self.app_block_height_on_start > self.last_block_height: - self.log.warning("Connector utxo height behind App height for %s blocks ..." % - (self.app_block_height_on_start - self.last_block_height,)) - - else: - self.app_block_height_on_start = self.last_block_utxo_cached_height - - - async def zeromq_handler(self): - while True: - try: - self.zmqContext = zmq.asyncio.Context() - self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) - self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") - if self.mempool_tx: - self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx") - self.zmqSubSocket.connect(self.zmq_url) - self.log.info("Zeromq started") - while True: - try: - msg = await self.zmqSubSocket.recv_multipart() - topic = msg[0] - body = msg[1] - - if topic == b"hashblock": - self.last_zmq_msg = int(time.time()) - if self.deep_synchronization: - continue - hash = body.hex() - self.log.warning("New block %s" % hash) - self.loop.create_task(self._get_block_by_hash(hash)) - - elif topic == b"rawtx": - self.last_zmq_msg = int(time.time()) - if self.deep_synchronization or not self.mempool_tx: - continue - try: - self.loop.create_task(self._new_transaction(Transaction(body, format="raw"))) - except: - self.log.critical("Transaction decode failed: %s" % body.hex()) - - if not self.active: - break - except asyncio.CancelledError: - self.log.warning("Zeromq handler terminating ...") - raise - except Exception as err: - self.log.error(str(err)) - - except asyncio.CancelledError: - self.zmqContext.destroy() - self.log.warning("Zeromq handler terminated") - break - except Exception as err: - self.log.error(str(err)) - await asyncio.sleep(1) - self.log.warning("Zeromq handler reconnecting ...") - if not self.active: - self.log.warning("Zeromq handler terminated") - break - - async def watchdog(self): - """ - backup synchronization option - in case zeromq failed - """ - while True: - try: - while True: - await asyncio.sleep(20) - if int(time.time()) - self.last_zmq_msg > 300 and self.zmqContext: - self.log.error("ZerroMQ no messages about 5 minutes") - try: - self.zmqContext.destroy() - self.zmqContext = None - except: - pass - if not self.get_next_block_mutex: - self.get_next_block_mutex = True - self.loop.create_task(self.get_next_block()) - except asyncio.CancelledError: - self.log.info("connector watchdog terminated") - break - except Exception as err: - self.log.error(str(traceback.format_exc())) - self.log.error("watchdog error %s " % err) - - async def get_next_block(self): - if self.active and self.active_block.done() and self.get_next_block_mutex: - try: - if self.node_last_block <= self.last_block_height + self.backlog: - d = await self.rpc.getblockcount() - if d == self.node_last_block: - self.log.info("blockchain is synchronized with backlog %s" % self.backlog) - return - else: - self.node_last_block = d - d = self.node_last_block - self.last_block_height - - if d > self.deep_sync_limit: - if not self.deep_synchronization: - self.log.warning("Deep synchronization mode") - self.deep_synchronization = True - else: - if self.deep_synchronization: - self.log.warning("Normal synchronization mode") - # clear preload caches - self.deep_synchronization = False - - if self.deep_synchronization: - raw_block = self.block_preload.pop(self.last_block_height + 1) - if raw_block: - q = time.time() - block = decode_block_tx(raw_block) - self.blocks_decode_time += time.time() - q - else: - h = self.block_hashes.pop(self.last_block_height + 1) - if h is None: - h = await self.rpc.getblockhash(self.last_block_height + 1) - if not self.block_hashes_preload_mutex: - self.loop.create_task(self.preload_blocks()) - block = await self._get_block_by_hash(h) - else: - h = await self.rpc.getblockhash(self.last_block_height + 1) - block = await self._get_block_by_hash(h) - - - self.loop.create_task(self._new_block(block)) - except Exception as err: - self.log.error("get next block failed %s" % str(err)) - finally: - self.get_next_block_mutex = False - - async def _get_block_by_hash(self, hash): - self.log.debug("get block by hash %s" % hash) - try: - if self.deep_synchronization: - q = time.time() - self.non_cached_blocks += 1 - raw_block = await self.rpc.getblock(hash, 0) - self.blocks_download_time += time.time() - q - q = time.time() - block = decode_block_tx(raw_block) - self.blocks_decode_time += time.time() - q - else: - q = time.time() - block = await self.rpc.getblock(hash) - self.blocks_download_time += time.time() - q - return block - except Exception: - self.log.error("get block by hash %s FAILED" % hash) - self.log.error(str(traceback.format_exc())) - - async def _new_block(self, block): - try: - if self.block_headers_cache.get(block["hash"]) is not None: - return - - if self.deep_synchronization: - block["height"] = self.last_block_height + 1 - if not self.active or not self.active_block.done() or self.last_block_height >= block["height"]: - return - self.active_block = asyncio.Future() - - self.log.debug("Block %s %s" % (block["height"], block["hash"])) - - self.cache_loading = True if self.last_block_height < self.app_block_height_on_start else False - - - if self.deep_synchronization: - tx_bin_list = [block["rawTx"][i]["txId"] for i in block["rawTx"]] - else: - tx_bin_list = [s2rh(h) for h in block["tx"]] - await self.verify_block_position(block) - - if self.before_block_handler and not self.cache_loading: - await self.before_block_handler(block) - - await self.fetch_block_transactions(block, tx_bin_list) - - if self.block_handler and not self.cache_loading: - await self.block_handler(block) - - self.block_headers_cache.set(block["hash"], block["height"]) - self.last_block_height = block["height"] - if self.utxo_data: - if not self.deep_synchronization: - self.utxo.destroy_utxo() - elif block["height"] % 500 == 0: - self.utxo.destroy_utxo() - - self.blocks_processed_count += 1 - - for h in tx_bin_list: self.tx_cache.pop(h) - - tx_rate = round(self.total_received_tx / (time.time() - self.start_time), 4) - t = 10000 if not self.deep_synchronization else 10000 - if (self.total_received_tx - self.total_received_tx_stat) > t: - self.total_received_tx_stat = self.total_received_tx - self.log.warning("Blocks %s; tx rate: %s;" % (block["height"], tx_rate)) - if self.utxo_data: - loading = "Loading ... " if self.cache_loading else "" - self.log.info(loading + "UTXO %s; hit rate: %s;" % (self.utxo.len(), - self.utxo.hit_rate())) - self.log.info("Blocks downloaded %s; decoded %s" % (round(self.blocks_download_time, 4), - round(self.blocks_decode_time, 4))) - if self.deep_synchronization: - self.log.info("Blocks not cached %s; " - "cache count %s; " - "cache size %s M;" % (self.non_cached_blocks, - self.block_preload.len(), - round(self.block_preload._store_size / 1024 / 1024, 2))) - self.log.info( - "cache first %s; " - "cache last %s;" % ( - next(iter(self.block_preload._store)), - next(reversed(self.block_preload._store)))) - - self.log.info("saved utxo block %s; " - "saved utxo %s; " - "deleted utxo %s; " - "loaded utxo %s; "% (self.utxo.last_saved_block, - self.utxo.saved_utxo, - self.utxo.deleted_utxo, - self.utxo.loaded_utxo - )) - self.log.info( - "destroyed utxo %s; " - "destroyed utxo block %s; " - "outs total %s;" % ( - self.utxo.destroyed_utxo, - self.utxo.destroyed_utxo_block, - self.utxo.outs_total - )) - # self.log.info("total tx fetch time %s;" % self.total_received_tx_time) - self.log.info("tt fetch time >>%s;" % self.tt) - - # after block added handler - if self.after_block_handler and not self.cache_loading: - try: - await self.after_block_handler(block) - except: - pass - - except Exception as err: - if self.await_tx: - self.await_tx = set() - for i in self.await_tx_future: - if not self.await_tx_future[i].done(): - self.await_tx_future[i].cancel() - self.await_tx_future = dict() - self.log.error(str(traceback.format_exc())) - self.log.error("block error %s" % str(err)) - finally: - self.active_block.set_result(True) - # self.log.debug("%s block [%s tx/ %s size] processing time %s cache [%s/%s]" % - # (block["height"], - # len(block["tx"]), - # block["size"] / 1000000, - # tm(bt), - # len(self.block_hashes._store), - # len(self.block_preload._store))) - if self.node_last_block > self.last_block_height: - self.get_next_block_mutex = True - self.loop.create_task(self.get_next_block()) - - async def fetch_block_transactions(self, block, tx_bin_list): - q = time.time() - if self.deep_synchronization: - self.await_tx = set(tx_bin_list) - self.await_tx_future = {i: asyncio.Future() for i in tx_bin_list} - self.block_txs_request = asyncio.Future() - for i in block["rawTx"]: - self.loop.create_task(self._new_transaction(block["rawTx"][i], - block["time"], - block["height"], - i)) - await asyncio.wait_for(self.block_txs_request, timeout=1500) - - - elif tx_bin_list: - raise Exception("not emplemted") - missed = list(tx_bin_list) - self.log.debug("Transactions missed %s" % len(missed)) - - if missed: - self.missed_tx = set(missed) - self.await_tx = set(missed) - self.await_tx_future = {i: asyncio.Future() for i in missed} - self.block_txs_request = asyncio.Future() - self.loop.create_task(self._get_missed(False, block["time"], block["height"])) - try: - await asyncio.wait_for(self.block_txs_request, timeout=self.block_timeout) - except asyncio.CancelledError: - # refresh rpc connection session - await self.rpc.close() - self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout) - raise RuntimeError("block transaction request timeout") - tx_count = len(block["tx"]) - self.total_received_tx += tx_count - self.total_received_tx_time += time.time() - q - rate = round(self.total_received_tx/self.total_received_tx_time) - self.log.debug("Transactions received: %s [%s] received tx rate tx/s ->> %s <<" % (tx_count, time.time() - q, rate)) - - async def verify_block_position(self, block): - if "previousblockhash" not in block : - return - if self.block_headers_cache.len() == 0: - return - - lb = self.block_headers_cache.get_last_key() - if self.block_headers_cache.get_last_key() != block["previousblockhash"]: - if self.block_headers_cache.get(block["previousblockhash"]) is None and self.last_block_height: - self.log.critical("Connector error! Node out of sync " - "no parent block in chain tail %s" % block["previousblockhash"]) - raise Exception("Node out of sync") - - if self.orphan_handler: - await self.orphan_handler(self.last_block_height) - self.block_headers_cache.pop_last() - self.last_block_height -= 1 - raise Exception("Sidebranch block removed") - - async def _get_missed(self, block_hash=False, block_time=None, block_height=None): - if block_hash: - try: - block = self.block_preload.pop(block_hash) - if not block: - t = time.time() - result = await self.rpc.getblock(block_hash, 0) - dt = time.time() - t - t = time.time() - block = decode_block_tx(result) - qt = time.time() - t - self.blocks_download_time += dt - self.blocks_decode_time += qt - - self.log.debug("block downloaded %s decoded %s " % (round(dt, 4), round(qt, 4))) - for index, tx in enumerate(block): - try: - self.missed_tx.remove(block[tx]["txId"]) - self.loop.create_task(self._new_transaction(block[tx], block_time, block_height, index)) - except: - pass - except Exception as err: - self.log.error("_get_missed exception %s " % str(err)) - self.log.error(str(traceback.format_exc())) - self.await_tx = set() - self.block_txs_request.cancel() - - elif self.get_missed_tx_threads <= self.get_missed_tx_threads_limit: - self.get_missed_tx_threads += 1 - # start more threads - if len(self.missed_tx) > 1: - self.loop.create_task(self._get_missed(False, block_time, block_height)) - while True: - if not self.missed_tx: - break - try: - batch = list() - while self.missed_tx: - batch.append(["getrawtransaction", self.missed_tx.pop()]) - if len(batch) >= self.batch_limit: - break - result = await self.rpc.batch(batch) - for r in result: - try: - tx = Transaction(r["result"], format="raw") - except: - self.log.error("Transaction decode failed: %s" % r["result"]) - raise Exception("Transaction decode failed") - self.loop.create_task(self._new_transaction(tx, block_time, None, None)) - except Exception as err: - self.log.error("_get_missed exception %s " % str(err)) - self.log.error(str(traceback.format_exc())) - self.await_tx = set() - self.block_txs_request.cancel() - self.get_missed_tx_threads -= 1 - - - async def wait_block_dependences(self, tx): - while self.await_tx_future: - for i in tx["vIn"]: - try: - if not self.await_tx_future[tx["vIn"][i]["txId"]].done(): - await self.await_tx_future[tx["vIn"][i]["txId"]] - break - except: - pass - else: - break - - - async def _new_transaction(self, tx, block_time = None, block_height = None, block_index = None): - if not(tx["txId"] in self.tx_in_process or self.tx_cache.get(tx["txId"])): - try: - stxo = None - self.tx_in_process.add(tx["txId"]) - if not tx["coinbase"]: - if block_height is not None: - await self.wait_block_dependences(tx) - if self.utxo: - stxo, missed = set(), set() - for i in tx["vIn"]: - outpoint = b"".join((inp["txId"], int_to_bytes(inp["vOut"]))) - try: - stxo.add(outpoint, tx["vIn"][i]["_c_"]) - except: - inp = tx["vIn"][i] - r = self.utxo.get(outpoint, block_height) - stxo.add(r) if r else missed.add((outpoint, (block_height << 42) + (block_index << 21) + i)) - - if missed: - await self.utxo.load_utxo() - [stxo.add(self.utxo.get_loaded(o, block_height)) for o, s in missed] - - if len(stxo) != len(tx["vIn"]) and not self.cache_loading: - self.log.critical("utxo get failed " + rh2s(tx["txId"])) - self.log.critical(str(stxo)) - raise Exception("utxo get failed ") - - if self.tx_handler and not self.cache_loading: - await self.tx_handler(tx, stxo, block_time, block_height, block_index) - - if self.utxo: - for i in tx["vOut"]: - try: - tx["vOut"][i]["_s_"] - self.tt += 1 - except: - out = tx["vOut"][i] - # if self.skip_opreturn and out["nType"] in (3, 8): - # continue - pointer = (block_height << 42) + (block_index << 21) + i - try: - address = out["scriptPubKey"] - except: - address = b"".join((bytes([out["nType"]]), out["addressHash"])) - self.utxo.set(b"".join((tx["txId"], int_to_bytes(i))), pointer, out["value"], address) - - self.tx_cache.set(tx["txId"], True) - try: - self.await_tx.remove(tx["txId"]) - if not self.await_tx_future[tx["txId"]].done(): - self.await_tx_future[tx["txId"]].set_result(True) - if not self.await_tx: - self.block_txs_request.set_result(True) - except: - pass - except Exception as err: - if tx["txId"] in self.await_tx: - self.await_tx = set() - self.block_txs_request.cancel() - for i in self.await_tx_future: - if not self.await_tx_future[i].done(): - self.await_tx_future[i].cancel() - self.log.debug("new transaction error %s " % err) - self.log.debug(str(traceback.format_exc())) - finally: - self.tx_in_process.remove(tx["txId"]) - - def put_utxo(self, tx, block_height, block_index): - for i in tx["vOut"]: - out = tx["vOut"][i] - if self.skip_opreturn and out["nType"] in (3, 8): - continue - pointer = (block_height << 42) + (block_index << 21) + i - if "addressHash" not in out: - address = out["scriptPubKey"] - else: - address = b"".join((bytes([out["nType"]]), out["addressHash"])) - outpoint = b"".join((tx["txId"], int_to_bytes(i))) - # self.tmp[outpoint] = (pointer, out["value"], address) - self.utxo.set(outpoint, pointer, out["value"], address) - - async def get_stxo(self, tx, block_height, block_index): - stxo, missed = set(), set() - block_height = 0 if block_height is None else block_height - block_index = 0 if block_index is None else block_index - - for i in tx["vIn"]: - inp = tx["vIn"][i] - outpoint = b"".join((inp["txId"], int_to_bytes(inp["vOut"]))) - r = self.utxo.get(outpoint, block_height) - stxo.add(r) if r else missed.add((outpoint, (block_height << 42) + (block_index << 21) + i)) - - if missed: - await self.utxo.load_utxo() - [stxo.add(self.utxo.get_loaded(o, block_height)) for o, s in missed] - - if len(stxo) != len(tx["vIn"]) and not self.cache_loading: - self.log.critical("utxo get failed " + rh2s(tx["txId"])) - self.log.critical(str(stxo)) - raise Exception("utxo get failed ") - return stxo - - - async def preload_blocks(self): - if self.block_hashes_preload_mutex: - return - try: - self.block_hashes_preload_mutex = True - max_height = self.node_last_block - self.deep_synchronization - height = self.last_block_height + 1 - processed_height = self.last_block_height - - while height < max_height: - if self.block_preload._store_size < self.block_preload_cache_limit: - try: - if height < self.last_block_height: - height = self.last_block_height + 1 - batch = list() - h_list = list() - while True: - batch.append(["getblockhash", height]) - h_list.append(height) - if len(batch) >= self.batch_limit or height >= max_height: - height += 1 - break - height += 1 - result = await self.rpc.batch(batch) - h = list() - batch = list() - for lh, r in zip(h_list, result): - try: - self.block_hashes.set(lh, r["result"]) - batch.append(["getblock", r["result"], 0]) - h.append(lh) - except: - pass - - blocks = await self.rpc.batch(batch) - - for x,y in zip(h,blocks): - try: - self.block_preload.set(x, (y["result"])) - except: - pass - - except asyncio.CancelledError: - self.log.info("connector preload_block_hashes failed") - break - except: - pass - if processed_height < self.last_block_height: - for i in range(processed_height, self.last_block_height ): - try: - self.block_preload.remove(i) - except: - pass - processed_height = self.last_block_height - if next(iter(self.block_preload._store)) < processed_height + 1: - for i in range(next(iter(self.block_preload._store)), self.last_block_height+1): - try: - self.block_preload.remove(i) - except: - pass - if self.block_preload._store_size < self.block_preload_cache_limit * 0.9: - continue - - await asyncio.sleep(10) - # remove unused items - - finally: - self.block_hashes_preload_mutex = False - - - async def stop(self): - self.active = False - self.log.warning("New block processing restricted") - self.log.warning("Stopping node connector ...") - [task.cancel() for task in self.tasks] - await asyncio.wait(self.tasks) - if not self.active_block.done(): - self.log.warning("Waiting active block task ...") - await self.active_block - await self.rpc.close() - if self.zmqContext: - self.zmqContext.destroy() - self.log.warning('Node connector terminated') - - -class UTXO(): - def __init__(self, db_pool, loop, log, cache_size): - self.cached = LRU(cache_size) - self.missed = set() - self.destroyed = LRU(200000) - self.deleted = LRU(200000) - self.log = log - self.loaded = OrderedDict() - self.maturity = 100 - self._cache_size = cache_size - self._db_pool = db_pool - self.loop = loop - self.clear_tail = False - self.last_saved_block = 0 - self.last_cached_block = 0 - self.save_process = False - self.load_utxo_future = asyncio.Future() - self.load_utxo_future.set_result(True) - self._requests = 0 - self._failed_requests = 0 - self._hit = 0 - self.saved_utxo = 0 - self.deleted_utxo = 0 - self.deleted_utxo_saved = 0 - self.loaded_utxo = 0 - self.destroyed_utxo = 0 - self.destroyed_utxo_block = 0 - self.outs_total = 0 - - def set(self, outpoint, pointer, amount, address): - self.cached[outpoint] = (pointer, amount, address) - self.outs_total += 1 - if pointer: - self.last_cached_block = pointer >> 42 - - def remove(self, outpoint): - del self.cached[outpoint] - - def destroy_utxo(self, block_height): - block_height -= self.maturity - for key in range(self.destroyed_utxo_block + 1, block_height + 1): - if key not in self.destroyed: continue - n = set() - for outpoint in self.destroyed[key]: - try: - self.cached.pop(outpoint) - self.destroyed_utxo += 1 - except: - try: - del self.loaded[outpoint] - self.destroyed_utxo += 1 - n.add(outpoint) - except: - self.destroyed_utxo += 1 - pass - self.deleted[key] = n - self.destroyed.pop(key) - - self.destroyed_utxo_block = block_height - if len(self.cached) - self._cache_size > 0 and not self.save_process: - self.loop.create_task(self.save_utxo(block_height)) - - async def save_utxo(self, block_height): - # save to db tail from cache - self.save_process = True - await asyncio.sleep(2) - c = len(self.cached) - self._cache_size - try: - lb = 0 - for key in iter(self.cached): - i = self.cached[key] - if c>0 and (i[0] >> 42) <= block_height: - c -= 1 - lb = i[0] >> 42 - continue - break - - if lb: - d = set() - for key in range(self.last_saved_block + 1, lb + 1): - try: - [d.add(i) for i in self.deleted[key]] - except: - pass - - a = set() - for key in iter(self.cached): - i = self.cached[key] - if (i[0] >> 42) > lb: break - a.add((key,b"".join((int_to_c_int(i[0]), - int_to_c_int(i[1]), - i[2])))) - - # insert to db - async with self._db_pool.acquire() as conn: - async with conn.transaction(): - if d: - await conn.execute("DELETE FROM connector_utxo WHERE " - "outpoint = ANY($1);", d) - if a: - await conn.copy_records_to_table('connector_utxo', - columns=["outpoint", "data"], records=a) - await conn.execute("UPDATE connector_utxo_state SET value = $1 " - "WHERE name = 'last_block';", lb) - await conn.execute("UPDATE connector_utxo_state SET value = $1 " - "WHERE name = 'last_cached_block';", block_height) - self.saved_utxo += len(a) - self.deleted_utxo += len(d) - - # remove from cache - for key in a: - try: - self.cached.pop(key[0]) - except: - pass - - for key in range(self.last_saved_block + 1, lb + 1): - try: - self.deleted.pop(key) - except: - pass - self.last_saved_block = lb - finally: - self.save_process = False - - def get(self, key, block_height): - self._requests += 1 - try: - i = self.cached.get(key) - del self.cached[key] - self.destroyed_utxo += 1 - # try: - # self.destroyed[block_height].add(key) - # except: - # self.destroyed[block_height] = {key} - self._hit += 1 - return i - except: - self._failed_requests += 1 - self.missed.add(key) - return None - - def get_loaded(self, key, block_height): - try: - i = self.loaded[key] - try: - self.destroyed[block_height].add(key) - except: - self.destroyed[block_height] = {key} - return i - except: - return None - - async def load_utxo(self): - while True: - if not self.load_utxo_future.done(): - await self.load_utxo_future - continue - break - try: - self.load_utxo_future = asyncio.Future() - l = set(self.missed) - async with self._db_pool.acquire() as conn: - rows = await conn.fetch("SELECT outpoint, connector_utxo.data " - "FROM connector_utxo " - "WHERE outpoint = ANY($1);", l) - for i in l: - try: - self.missed.remove(i) - except: - pass - for row in rows: - d = row["data"] - pointer = c_int_to_int(d) - f = c_int_len(pointer) - amount = c_int_to_int(d[f:]) - f += c_int_len(amount) - address = d[f:] - self.loaded[row["outpoint"]] = (pointer, amount, address) - self.loaded_utxo += 1 - finally: - self.load_utxo_future.set_result(True) - - - def len(self): - return len(self.cached) - - def hit_rate(self): - if self._requests: - return self._hit / self._requests - else: - return 0 - - - -def get_stream(stream): - if not isinstance(stream, io.BytesIO): - if isinstance(stream, str): - stream = bytes.fromhex(stream) - if isinstance(stream, bytes): - stream = io.BytesIO(stream) - else: - raise TypeError("object should be bytes or HEX encoded string") - return stream - - -def decode_block_tx(block): - s = get_stream(block) - b = dict() - b["amount"] = 0 - b["strippedSize"] = 80 - b["version"] = unpack("L", b["version"]).hex() - b["previousBlockHash"] = rh2s(s.read(32)) - b["merkleRoot"] = rh2s(s.read(32)) - b["time"] = unpack("= self._max_size: - self.clear_tail = True - if self.clear_tail and self.clear_tail_auto: - if self._store_size >= int(self._max_size * 0.75): - try: - [self.pop_last() for i in range(20)] - except: - pass - else: - self.clear_tail = False - - def get(self, key): - self._requests += 1 - try: - i = self._store[key] - self._hit += 1 - return i - except: - return None - - def pop(self, key): - self._requests += 1 - try: - data = self._store.pop(key) - self._store_size -= sys.getsizeof(data) + sys.getsizeof(key) - self._hit += 1 - return data - except: - return None - - def remove(self, key): - try: - data = self._store.pop(key) - self._store_size -= sys.getsizeof(data) + sys.getsizeof(key) - except: - pass - - def pop_last(self): - try: - i = next(reversed(self._store)) - data = self._store[i] - del self._store[i] - self._store_size -= sys.getsizeof(data) + sys.getsizeof(i) - return data - except: - return None - - def get_last_key(self): - try: - i = next(reversed(self._store)) - return i - except: - return None - - def len(self): - return len(self._store) - - def hitrate(self): - if self._requests: - return self._hit / self._requests - else: - return 0 - - -def tm(p=None): - if p is not None: - return round(time.time() - p, 4) - return time.time() - diff --git a/pybtc/__init__.py b/pybtc/__init__.py index c43aa70..f6b92a8 100644 --- a/pybtc/__init__.py +++ b/pybtc/__init__.py @@ -7,5 +7,6 @@ from .block import * from .address import * from .wallet import * from .crypto import * +from lru import LRU from pybtc.connector import Connector diff --git a/pybtc/connector/block_loader.py b/pybtc/connector/block_loader.py index 6fe4d81..aa86076 100644 --- a/pybtc/connector/block_loader.py +++ b/pybtc/connector/block_loader.py @@ -47,7 +47,7 @@ class BlockLoader: if next(iter(self.parent.block_preload._store)) <= self.parent.last_block_height: for i in range(next(iter(self.parent.block_preload._store)), self.parent.last_block_height + 1): - try: self.parent.block_preload.remove(i) + try: del self.parent.block_preload.cached[i] except: pass except asyncio.CancelledError: diff --git a/pybtc/connector/utxo.py b/pybtc/connector/utxo.py index 0d72fde..560e750 100644 --- a/pybtc/connector/utxo.py +++ b/pybtc/connector/utxo.py @@ -1,25 +1,28 @@ from pybtc import int_to_c_int, c_int_to_int, c_int_len import asyncio from collections import OrderedDict, deque -from lru import LRU - +from collections import OrderedDict, deque as LRU +from pybtc import LRU class UTXO(): - def __init__(self, db_pool, loop, log, cache_size): - self.cached = LRU() + def __init__(self, db_pool, loop, log, cache_size, block_txo_max = 500000): + self.cached = LRU(cache_size) self.missed = set() self.destroyed = deque() self.deleted = LRU(200000) self.log = log self.loaded = OrderedDict() self.maturity = 100 - self._cache_size = cache_size + self.block_txo_max = block_txo_max + self._cache_hard_limit = cache_size - block_txo_max + self._cache_soft_limit = cache_size - block_txo_max * 2 self._db_pool = db_pool self.loop = loop self.clear_tail = False self.last_saved_block = 0 self.last_cached_block = 0 - self.save_process = False + self.save_future = asyncio.Future() + self.save_future.set_result(True) self.load_utxo_future = asyncio.Future() self.load_utxo_future.set_result(True) self._requests = 0 @@ -35,9 +38,7 @@ class UTXO(): def set(self, outpoint, pointer, amount, address): self.cached[outpoint] = (pointer, amount, address) - self.outs_total += 1 - if pointer: - self.last_cached_block = pointer >> 42 + def remove(self, outpoint): del self.cached[outpoint] @@ -56,73 +57,79 @@ class UTXO(): self.destroyed_utxo += 1 pass - # if len(self.cached) - self._cache_size > 0 and not self.save_process: + # if len(self.cached) > self._cache_hard_limit: + # await self.save_utxo() + # elif len(self.cached) > self._cache_soft_limit and self.save_future.done(): # self.loop.create_task(self.save_utxo()) - async def save_utxo(self, block_height): + async def save_utxo(self): # save to db tail from cache - self.save_process = True - await asyncio.sleep(2) - c = len(self.cached) - self._cache_size - try: - lb = 0 - for key in iter(self.cached): - i = self.cached[key] - if c>0 and (i[0] >> 42) <= block_height: - c -= 1 - lb = i[0] >> 42 - continue - break - - if lb: - d = set() - for key in range(self.last_saved_block + 1, lb + 1): - try: - [d.add(i) for i in self.deleted[key]] - except: - pass - - a = set() + if not self.save_future.done(): + await self.save_future.done() + return + self.save_future = asyncio.Future() + while True: + c = len(self.cached) - self._cache_soft_limit - self.block_txo_max + if c <= 0: break + try: + lb = 0 for key in iter(self.cached): i = self.cached[key] - if (i[0] >> 42) > lb: break - a.add((key,b"".join((int_to_c_int(i[0]), - int_to_c_int(i[1]), - i[2])))) + if c>0 and (i[0] >> 42) <= block_height: + c -= 1 + lb = i[0] >> 42 + continue + break - # insert to db - async with self._db_pool.acquire() as conn: - async with conn.transaction(): - if d: - await conn.execute("DELETE FROM connector_utxo WHERE " - "outpoint = ANY($1);", d) - if a: - await conn.copy_records_to_table('connector_utxo', - columns=["outpoint", "data"], records=a) - await conn.execute("UPDATE connector_utxo_state SET value = $1 " - "WHERE name = 'last_block';", lb) - await conn.execute("UPDATE connector_utxo_state SET value = $1 " - "WHERE name = 'last_cached_block';", block_height) - self.saved_utxo += len(a) - self.deleted_utxo += len(d) + if lb: + d = set() + for key in range(self.last_saved_block + 1, lb + 1): + try: + [d.add(i) for i in self.deleted[key]] + except: + pass - # remove from cache - for key in a: - try: - self.cached.pop(key[0]) - except: - pass + a = set() + for key in iter(self.cached): + i = self.cached[key] + if (i[0] >> 42) > lb: break + a.add((key,b"".join((int_to_c_int(i[0]), + int_to_c_int(i[1]), + i[2])))) - for key in range(self.last_saved_block + 1, lb + 1): - try: - self.deleted.pop(key) - except: - pass - self.last_saved_block = lb - finally: - self.save_process = False + # insert to db + async with self._db_pool.acquire() as conn: + async with conn.transaction(): + if d: + await conn.execute("DELETE FROM connector_utxo WHERE " + "outpoint = ANY($1);", d) + if a: + await conn.copy_records_to_table('connector_utxo', + columns=["outpoint", "data"], records=a) + await conn.execute("UPDATE connector_utxo_state SET value = $1 " + "WHERE name = 'last_block';", lb) + await conn.execute("UPDATE connector_utxo_state SET value = $1 " + "WHERE name = 'last_cached_block';", block_height) + self.saved_utxo += len(a) + self.deleted_utxo += len(d) + + # remove from cache + for key in a: + try: + self.cached.pop(key[0]) + except: + pass + + for key in range(self.last_saved_block + 1, lb + 1): + try: + self.deleted.pop(key) + except: + pass + self.last_saved_block = lb + finally: + self.save_future = False def get(self, key): self._requests += 1 diff --git a/pybtc/lru/lru.c b/pybtc/lru/lru.c new file mode 100644 index 0000000..371d2a7 --- /dev/null +++ b/pybtc/lru/lru.c @@ -0,0 +1,714 @@ +#include + +/* + * This is a simple implementation of LRU Dict that uses a Python dict and an associated doubly linked + * list to keep track of recently inserted/accessed items. + * + * Dict will store: key -> Node mapping, where Node is a linked list node. + * The Node itself will contain the value as well as the key. + * + * For eg: + * + * >>> l = LRU(2) + * >>> l[0] = 'foo' + * >>> l[1] = 'bar' + * + * can be visualised as: + * + * ---+--(hash(0)--+--hash(1)--+ + * self->dict ...| | | + * ---+-----|------+---------|-+ + * | | + * +-----v------+ +-----v------+ + * self->first--->|<'foo'>, <0>|-->|<'bar'>, <1>|<---self->last + * +--| |<--| |--+ + * | +------------+ +------------+ | + * v v + * NULL NULL + * + * The invariant is to maintain the list to reflect the LRU order of items in the dict. + * self->first will point to the MRU item and self-last to LRU item. Size of list will not + * grow beyond size of LRU dict. + * + */ + +#ifndef Py_TYPE + #define Py_TYPE(ob) (((PyObject*)(ob))->ob_type) +#endif + +#define GET_NODE(d, key) (Node *) Py_TYPE(d)->tp_as_mapping->mp_subscript((d), (key)) +#define PUT_NODE(d, key, node) Py_TYPE(d)->tp_as_mapping->mp_ass_subscript((d), (key), ((PyObject *)node)) + +/* If someone figures out how to enable debug builds with setuptools, you can delete this */ +#if 0 +#undef assert +#define str(s) #s +#define assert(v) \ + do { \ + if (!(v)) { \ + fprintf(stderr, "Assertion failed: %s on %s:%d\n", \ + str(v), __FILE__, __LINE__); \ + fflush(stderr); \ + abort(); \ + } \ + } while(0) +#endif + +typedef struct _Node { + PyObject_HEAD + PyObject * value; + PyObject * key; + struct _Node * prev; + struct _Node * next; +} Node; + +static void +node_dealloc(Node* self) +{ + Py_DECREF(self->key); + Py_DECREF(self->value); + assert(self->prev == NULL); + assert(self->next == NULL); + PyObject_Del((PyObject*)self); +} + +static PyObject* +node_repr(Node* self) +{ + return PyObject_Repr(self->value); +} + +static PyTypeObject NodeType = { + PyVarObject_HEAD_INIT(NULL, 0) + "lru.Node", /* tp_name */ + sizeof(Node), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor)node_dealloc,/* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + (reprfunc)node_repr, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + "Linked List Node", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + 0, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + 0, /* tp_new */ +}; + +typedef struct { + PyObject_HEAD + PyObject * dict; + Node * first; + Node * last; + Py_ssize_t size; + Py_ssize_t hits; + Py_ssize_t misses; + PyObject *callback; +} LRU; + + +static PyObject * +set_callback(LRU *self, PyObject *args) +{ + PyObject *result = NULL; + PyObject *temp; + + if (PyArg_ParseTuple(args, "O:set_callback", &temp)) { + if (temp == Py_None) { + Py_XDECREF(self->callback); + self->callback = NULL; + } else if (!PyCallable_Check(temp)) { + PyErr_SetString(PyExc_TypeError, "parameter must be callable"); + return NULL; + } else { + Py_XINCREF(temp); /* Add a reference to new callback */ + Py_XDECREF(self->callback); /* Dispose of previous callback */ + self->callback = temp; /* Remember new callback */ + } + Py_RETURN_NONE; + } + return result; +} + +static void +lru_remove_node(LRU *self, Node* node) +{ + if (self->first == node) { + self->first = node->next; + } + if (self->last == node) { + self->last = node->prev; + } + if (node->prev) { + node->prev->next = node->next; + } + if (node->next) { + node->next->prev = node->prev; + } + node->next = node->prev = NULL; +} + +static void +lru_add_node_at_head(LRU *self, Node* node) +{ + node->prev = NULL; + if (!self->first) { + self->first = self->last = node; + node->next = NULL; + } else { + node->next = self->first; + if (node->next) { + node->next->prev = node; + } + self->first = node; + } +} + +static void +lru_delete_last(LRU *self) +{ + PyObject *arglist; + PyObject *result; + Node* n = self->last; + + if (!self->last) + return; + + if (self->callback) { + + arglist = Py_BuildValue("OO", n->key, n->value); + result = PyObject_CallObject(self->callback, arglist); + Py_XDECREF(result); + Py_DECREF(arglist); + } + + lru_remove_node(self, n); + PUT_NODE(self->dict, n->key, NULL); +} + +static Py_ssize_t +lru_length(LRU *self) +{ + return PyDict_Size(self->dict); +} + +static PyObject * +LRU_contains_key(LRU *self, PyObject *key) +{ + if (PyDict_Contains(self->dict, key)) { + Py_RETURN_TRUE; + } else { + Py_RETURN_FALSE; + } +} + +static PyObject * +LRU_contains(LRU *self, PyObject *args) +{ + PyObject *key; + if (!PyArg_ParseTuple(args, "O", &key)) + return NULL; + return LRU_contains_key(self, key); +} + +static int +LRU_seq_contains(LRU *self, PyObject *key) +{ + return PyDict_Contains(self->dict, key); +} + +static PyObject * +lru_subscript(LRU *self, register PyObject *key) +{ + Node *node = GET_NODE(self->dict, key); + if (!node) { + self->misses++; + return NULL; + } + + assert(PyObject_TypeCheck(node, &NodeType)); + + /* We don't need to move the node when it's already self->first. */ + if (node != self->first) { + lru_remove_node(self, node); + lru_add_node_at_head(self, node); + } + + self->hits++; + Py_INCREF(node->value); + Py_DECREF(node); + return node->value; +} + +static PyObject * +LRU_get(LRU *self, PyObject *args) +{ + PyObject *key; + PyObject *instead = NULL; + PyObject *result; + + if (!PyArg_ParseTuple(args, "O|O", &key, &instead)) + return NULL; + + result = lru_subscript(self, key); + PyErr_Clear(); /* GET_NODE sets an exception on miss. Shut it up. */ + if (result) + return result; + + if (!instead) { + Py_RETURN_NONE; + } + + Py_INCREF(instead); + return instead; +} + +static int +lru_ass_sub(LRU *self, PyObject *key, PyObject *value) +{ + int res = 0; + Node *node = GET_NODE(self->dict, key); + PyErr_Clear(); /* GET_NODE sets an exception on miss. Shut it up. */ + + if (value) { + if (node) { + Py_INCREF(value); + Py_DECREF(node->value); + node->value = value; + + lru_remove_node(self, node); + lru_add_node_at_head(self, node); + + res = 0; + } else { + node = PyObject_NEW(Node, &NodeType); + node->key = key; + node->value = value; + node->next = node->prev = NULL; + + Py_INCREF(key); + Py_INCREF(value); + + res = PUT_NODE(self->dict, key, node); + if (res == 0) { + if (lru_length(self) > self->size) { + lru_delete_last(self); + } + + lru_add_node_at_head(self, node); + } + } + } else { + res = PUT_NODE(self->dict, key, NULL); + if (res == 0) { + assert(node && PyObject_TypeCheck(node, &NodeType)); + lru_remove_node(self, node); + } + } + + Py_XDECREF(node); + return res; +} + +static PyMappingMethods LRU_as_mapping = { + (lenfunc)lru_length, /*mp_length*/ + (binaryfunc)lru_subscript, /*mp_subscript*/ + (objobjargproc)lru_ass_sub, /*mp_ass_subscript*/ +}; + +static PyObject * +collect(LRU *self, PyObject * (*getterfunc)(Node *)) +{ + register PyObject *v; + Node *curr; + int i; + v = PyList_New(lru_length(self)); + if (v == NULL) + return NULL; + curr = self->first; + i = 0; + + while (curr) { + PyList_SET_ITEM(v, i++, getterfunc(curr)); + curr = curr->next; + } + assert(i == lru_length(self)); + return v; +} + +static PyObject * +get_key(Node *node) +{ + Py_INCREF(node->key); + return node->key; +} + +static PyObject * +LRU_update(LRU *self, PyObject *args, PyObject *kwargs) +{ + PyObject *key, *value; + PyObject *arg = NULL; + Py_ssize_t pos = 0; + + if ((PyArg_ParseTuple(args, "|O", &arg))) { + if (arg && PyDict_Check(arg)) { + while (PyDict_Next(arg, &pos, &key, &value)) + lru_ass_sub(self, key, value); + } + } + + if (kwargs != NULL && PyDict_Check(kwargs)) { + while (PyDict_Next(kwargs, &pos, &key, &value)) + lru_ass_sub(self, key, value); + } + + Py_RETURN_NONE; +} + +static PyObject * +LRU_peek_first_item(LRU *self) +{ + if (self->first) { + PyObject *tuple = PyTuple_New(2); + Py_INCREF(self->first->key); + PyTuple_SET_ITEM(tuple, 0, self->first->key); + Py_INCREF(self->first->value); + PyTuple_SET_ITEM(tuple, 1, self->first->value); + return tuple; + } + else Py_RETURN_NONE; +} + +static PyObject * +LRU_peek_last_item(LRU *self) +{ + if (self->last) { + PyObject *tuple = PyTuple_New(2); + Py_INCREF(self->last->key); + PyTuple_SET_ITEM(tuple, 0, self->last->key); + Py_INCREF(self->last->value); + PyTuple_SET_ITEM(tuple, 1, self->last->value); + return tuple; + } + else Py_RETURN_NONE; +} + +static PyObject * +LRU_keys(LRU *self) { + return collect(self, get_key); +} + +static PyObject * +get_value(Node *node) +{ + Py_INCREF(node->value); + return node->value; +} + +static PyObject * +LRU_values(LRU *self) +{ + return collect(self, get_value); +} + +static PyObject * +LRU_set_callback(LRU *self, PyObject *args) +{ + return set_callback(self, args); +} + +static PyObject * +get_item(Node *node) +{ + PyObject *tuple = PyTuple_New(2); + Py_INCREF(node->key); + PyTuple_SET_ITEM(tuple, 0, node->key); + Py_INCREF(node->value); + PyTuple_SET_ITEM(tuple, 1, node->value); + return tuple; +} + +static PyObject * +LRU_items(LRU *self) +{ + return collect(self, get_item); +} + +static PyObject * +LRU_set_size(LRU *self, PyObject *args, PyObject *kwds) +{ + Py_ssize_t newSize; + if (!PyArg_ParseTuple(args, "n", &newSize)) { + return NULL; + } + if (newSize <= 0) { + PyErr_SetString(PyExc_ValueError, "Size should be a positive number"); + return NULL; + } + while (lru_length(self) > newSize) { + lru_delete_last(self); + } + self->size = newSize; + Py_RETURN_NONE; +} + +static PyObject * +LRU_clear(LRU *self) +{ + Node *c = self->first; + + while (c) { + Node* n = c; + c = c->next; + lru_remove_node(self, n); + } + PyDict_Clear(self->dict); + + self->hits = 0; + self->misses = 0; + Py_RETURN_NONE; +} + + +static PyObject * +LRU_get_size(LRU *self) +{ + return Py_BuildValue("i", self->size); +} + +static PyObject * +LRU_get_stats(LRU *self) +{ + return Py_BuildValue("nn", self->hits, self->misses); +} + + +/* Hack to implement "key in lru" */ +static PySequenceMethods lru_as_sequence = { + 0, /* sq_length */ + 0, /* sq_concat */ + 0, /* sq_repeat */ + 0, /* sq_item */ + 0, /* sq_slice */ + 0, /* sq_ass_item */ + 0, /* sq_ass_slice */ + (objobjproc) LRU_seq_contains, /* sq_contains */ + 0, /* sq_inplace_concat */ + 0, /* sq_inplace_repeat */ +}; + +static PyMethodDef LRU_methods[] = { + {"__contains__", (PyCFunction)LRU_contains_key, METH_O | METH_COEXIST, + PyDoc_STR("L.__contains__(key) -> Check if key is there in L")}, + {"keys", (PyCFunction)LRU_keys, METH_NOARGS, + PyDoc_STR("L.keys() -> list of L's keys in MRU order")}, + {"values", (PyCFunction)LRU_values, METH_NOARGS, + PyDoc_STR("L.values() -> list of L's values in MRU order")}, + {"items", (PyCFunction)LRU_items, METH_NOARGS, + PyDoc_STR("L.items() -> list of L's items (key,value) in MRU order")}, + {"has_key", (PyCFunction)LRU_contains, METH_VARARGS, + PyDoc_STR("L.has_key(key) -> Check if key is there in L")}, + {"get", (PyCFunction)LRU_get, METH_VARARGS, + PyDoc_STR("L.get(key, instead) -> If L has key return its value, otherwise instead")}, + {"set_size", (PyCFunction)LRU_set_size, METH_VARARGS, + PyDoc_STR("L.set_size() -> set size of LRU")}, + {"get_size", (PyCFunction)LRU_get_size, METH_NOARGS, + PyDoc_STR("L.get_size() -> get size of LRU")}, + {"clear", (PyCFunction)LRU_clear, METH_NOARGS, + PyDoc_STR("L.clear() -> clear LRU")}, + {"get_stats", (PyCFunction)LRU_get_stats, METH_NOARGS, + PyDoc_STR("L.get_stats() -> returns a tuple with cache hits and misses")}, + {"peek_first_item", (PyCFunction)LRU_peek_first_item, METH_NOARGS, + PyDoc_STR("L.peek_first_item() -> returns the MRU item (key,value) without changing key order")}, + {"peek_last_item", (PyCFunction)LRU_peek_last_item, METH_NOARGS, + PyDoc_STR("L.peek_last_item() -> returns the LRU item (key,value) without changing key order")}, + {"update", (PyCFunction)LRU_update, METH_VARARGS | METH_KEYWORDS, + PyDoc_STR("L.update() -> update value for key in LRU")}, + {"set_callback", (PyCFunction)LRU_set_callback, METH_VARARGS, + PyDoc_STR("L.set_callback(callback) -> set a callback to call when an item is evicted.")}, + {NULL, NULL}, +}; + +static PyObject* +LRU_repr(LRU* self) +{ + return PyObject_Repr(self->dict); +} + +static int +LRU_init(LRU *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"size", "callback", NULL}; + PyObject *callback = NULL; + self->callback = NULL; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "n|O", kwlist, &self->size, &callback)) { + return -1; + } + + if (callback && callback != Py_None) { + if (!PyCallable_Check(callback)) { + PyErr_SetString(PyExc_TypeError, "parameter must be callable"); + return -1; + } + Py_XINCREF(callback); + self->callback = callback; + } + + if ((Py_ssize_t)self->size <= 0) { + PyErr_SetString(PyExc_ValueError, "Size should be a positive number"); + return -1; + } + self->dict = PyDict_New(); + self->first = self->last = NULL; + self->hits = 0; + self->misses = 0; + return 0; +} + +static void +LRU_dealloc(LRU *self) +{ + if (self->dict) { + LRU_clear(self); + Py_DECREF(self->dict); + Py_XDECREF(self->callback); + } + PyObject_Del((PyObject*)self); +} + +PyDoc_STRVAR(lru_doc, +"LRU(size, callback=None) -> new LRU dict that can store up to size elements\n" +"An LRU dict behaves like a standard dict, except that it stores only fixed\n" +"set of elements. Once the size overflows, it evicts least recently used\n" +"items. If a callback is set it will call the callback with the evicted key\n" +" and item.\n\n" +"Eg:\n" +">>> l = LRU(3)\n" +">>> for i in range(5):\n" +">>> l[i] = str(i)\n" +">>> l.keys()\n" +"[2,3,4]\n\n" +"Note: An LRU(n) can be thought of as a dict that will have the most\n" +"recently accessed n items.\n"); + +static PyTypeObject LRUType = { + PyVarObject_HEAD_INIT(NULL, 0) + "lru.LRU", /* tp_name */ + sizeof(LRU), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor)LRU_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + (reprfunc)LRU_repr, /* tp_repr */ + 0, /* tp_as_number */ + &lru_as_sequence, /* tp_as_sequence */ + &LRU_as_mapping, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + lru_doc, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + LRU_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)LRU_init, /* tp_init */ + 0, /* tp_alloc */ + 0, /* tp_new */ +}; + +#if PY_MAJOR_VERSION >= 3 + static struct PyModuleDef moduledef = { + PyModuleDef_HEAD_INIT, + "lru", /* m_name */ + lru_doc, /* m_doc */ + -1, /* m_size */ + NULL, /* m_methods */ + NULL, /* m_reload */ + NULL, /* m_traverse */ + NULL, /* m_clear */ + NULL, /* m_free */ + }; +#endif + +static PyObject * +moduleinit(void) +{ + PyObject *m; + + NodeType.tp_new = PyType_GenericNew; + if (PyType_Ready(&NodeType) < 0) + return NULL; + + LRUType.tp_new = PyType_GenericNew; + if (PyType_Ready(&LRUType) < 0) + return NULL; + + #if PY_MAJOR_VERSION >= 3 + m = PyModule_Create(&moduledef); + #else + m = Py_InitModule3("lru", NULL, lru_doc); + #endif + + if (m == NULL) + return NULL; + + Py_INCREF(&NodeType); + Py_INCREF(&LRUType); + PyModule_AddObject(m, "LRU", (PyObject *) &LRUType); + + return m; +} + +#if PY_MAJOR_VERSION < 3 + PyMODINIT_FUNC + initlru(void) + { + moduleinit(); + } +#else + PyMODINIT_FUNC + PyInit_lru(void) + { + return moduleinit(); + } +#endif \ No newline at end of file diff --git a/setup.py b/setup.py index fe93df0..dce07aa 100644 --- a/setup.py +++ b/setup.py @@ -140,7 +140,6 @@ setup(name='pybtc', package_data={ 'pybtc': ['bip39_word_list/*.txt', 'test/*.txt'], }, - install_requires=['lru-dict', 'msgpack'], cmdclass={ 'build_clib': build_clib, 'build_ext': build_ext, @@ -149,11 +148,9 @@ setup(name='pybtc', 'bdist_wheel': bdist_wheel }, distclass=Distribution, - ext_modules=[ - Extension("_secp256k1", - ["pybtc/_secp256k1/module_secp256k1.c"], - include_dirs=["libsecp256k1/include/", - "libsecp256k1/src/"]), + ext_modules=[Extension("lru", ["pybtc/lru/lru.c"]), + Extension("_secp256k1", ["pybtc/_secp256k1/module_secp256k1.c"], + include_dirs=["libsecp256k1/include/", "libsecp256k1/src/"]), Extension("_crypto", ["pybtc/_crypto/module_crypto.cpp", "pybtc/_crypto/crypto/aes.cpp",