From 131f8264be702d061e1ea4f4ad8b04baf1600650 Mon Sep 17 00:00:00 2001 From: 4tochka Date: Thu, 25 Apr 2019 12:18:37 +0400 Subject: [PATCH] bitcoind connector --- pybtc/connector.py | 619 ++++++++++++++++++++++++++++++++ pybtc/test/connector/Dockerfile | 20 ++ pybtc/test/connector/build.sh | 1 + pybtc/test/connector/run.sh | 8 + pybtc/test/connector/test.conf | 4 + pybtc/test/connector/test.py | 98 +++++ 6 files changed, 750 insertions(+) create mode 100644 pybtc/connector.py create mode 100644 pybtc/test/connector/Dockerfile create mode 100644 pybtc/test/connector/build.sh create mode 100644 pybtc/test/connector/run.sh create mode 100644 pybtc/test/connector/test.conf create mode 100644 pybtc/test/connector/test.py diff --git a/pybtc/connector.py b/pybtc/connector.py new file mode 100644 index 0000000..e185aee --- /dev/null +++ b/pybtc/connector.py @@ -0,0 +1,619 @@ +from pybtc.functions.tools import rh2s +from pybtc.functions.tools import var_int_to_int +from pybtc.functions.tools import read_var_int +from pybtc.functions.tools import bytes_from_hex +from pybtc.transaction import Transaction +import traceback +import aiojsonrpc +import zmq +import zmq.asyncio +import asyncio +import time +import io +from collections import OrderedDict + +class Connector: + def __init__(self, node_rpc_url, node_zerromq_url, logger, + last_block_height=0, chain_tail=None, + mempool_tx_list=None, + tx_handler=None, orphan_handler=None, + before_block_handler=None, block_handler=None, after_block_handler=None, + block_timeout=30, + deep_sync_limit=20, backlog=0, mempool_tx=True, + rpc_batch_limit=20, rpc_threads_limit=100, rpc_timeout=100, + preload=False): + self.loop = asyncio.get_event_loop() + + # settings + self.log = logger + self.rpc_url = node_rpc_url + self.zmq_url = node_zerromq_url + self.orphan_handler = orphan_handler + self.block_timeout = block_timeout + self.tx_handler = tx_handler + self.before_block_handler = before_block_handler + self.block_handler = block_handler + self.after_block_handler = after_block_handler + self.deep_sync_limit = deep_sync_limit + self.backlog = backlog + self.mempool_tx = mempool_tx + self.chain_tail = list(chain_tail) if chain_tail else [] + self.mempool_tx_list = list(mempool_tx_list) if mempool_tx_list else [] + self.rpc_timeout = rpc_timeout + self.batch_limit = rpc_batch_limit + + # state and stats + self.node_last_block = None + self.last_block_height = int(last_block_height) if int(last_block_height) else 0 + self.deep_synchronization = False + + self.block_dependency_tx = 0 # counter of tx that have dependencies in block + self.active = True + self.get_next_block_mutex = asyncio.Future() + self.get_next_block_mutex.set_result(True) + self.active_block = asyncio.Future() + self.active_block.set_result(True) + self.last_zmq_msg = int(time.time()) + self.total_received_tx = 0 + self.blocks_processed_count = 0 + self.blocks_decode_time = 0 + self.blocks_download_time = 0 + self.blocks_processing_time = 0 + self.total_received_tx_time = 0 + + # cache and system + self.preload = preload + self.block_preload = Cache(max_size=50000) + self.block_hashes_preload = Cache(max_size=50000) + self.tx_cache = Cache(max_size=50000) + self.block_cache = Cache(max_size=10000) + + self.block_txs_request = None + + self.connected = asyncio.Future() + self.await_tx_list = list() + self.missed_tx_list = list() + self.await_tx_future = dict() + self.add_tx_future = dict() + self.get_missed_tx_threads = 0 + self.get_missed_tx_threads_limit = rpc_threads_limit + self.tx_in_process = set() + self.zmqContext = None + self.tasks = list() + self.log.info("Node connector started") + asyncio.ensure_future(self.start(), loop=self.loop) + + async def start(self): + while True: + self.log.info("Connector initialization") + try: + self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout) + self.node_last_block = await self.rpc.getblockcount() + except Exception as err: + self.log.error("Get node best block error:" + str(err)) + if not isinstance(self.node_last_block, int): + self.log.error("Get node best block height failed") + self.log.error("Node rpc url: "+self.rpc_url) + await asyncio.sleep(10) + continue + + self.log.info("Node best block height %s" %self.node_last_block) + self.log.info("Connector last block height %s" % self.last_block_height) + + if self.node_last_block < self.last_block_height: + self.log.error("Node is behind application blockchain state!") + await asyncio.sleep(10) + continue + elif self.node_last_block == self.last_block_height: + self.log.warning("Blockchain is synchronized") + else: + d = self.node_last_block - self.last_block_height + self.log.warning("%s blocks before synchronization synchronized") + if d > self.deep_sync_limit: + self.log.warning("Deep synchronization mode") + self.deep_synchronization = True + break + + [self.tx_cache.set(row, True) for row in self.mempool_tx_list] + h = self.last_block_height + if h < len(self.chain_tail): + raise Exception("Chain tail len not match last block height") + for row in reversed(self.chain_tail): + self.block_cache.set(row, h) + h -= 1 + + self.tasks.append(self.loop.create_task(self.zeromq_handler())) + self.tasks.append(self.loop.create_task(self.watchdog())) + self.connected.set_result(True) + if self.preload: + self.loop.create_task(self.preload_block()) + self.loop.create_task(self.preload_block_hashes()) + self.loop.create_task(self.get_next_block()) + + async def zeromq_handler(self): + while True: + try: + self.zmqContext = zmq.asyncio.Context() + self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") + if self.mempool_tx: + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx") + self.zmqSubSocket.connect(self.zmq_url) + self.log.info("Zeromq started") + while True: + try: + msg = await self.zmqSubSocket.recv_multipart() + topic = msg[0] + body = msg[1] + if topic == b"hashblock": + self.last_zmq_msg = int(time.time()) + hash = body.hex() + self.log.warning("New block %s" % hash) + self.loop.create_task(self._get_block_by_hash(hash)) + elif topic == b"rawtx": + self.last_zmq_msg = int(time.time()) + if self.deep_synchronization: + continue + try: + self.loop.create_task(self._new_transaction(Transaction(body, format="raw"))) + except: + self.log.critical("Transaction decode failed: %s" % body.hex()) + + if not self.active: + break + except asyncio.CancelledError: + self.log.warning("Zeromq handler terminating ...") + raise + except Exception as err: + self.log.error(str(err)) + + except asyncio.CancelledError: + self.zmqContext.destroy() + self.log.warning("Zeromq handler terminated") + break + except Exception as err: + self.log.error(str(err)) + await asyncio.sleep(1) + self.log.warning("Zeromq handler reconnecting ...") + if not self.active: + self.log.warning("Zeromq handler terminated") + break + + async def watchdog(self): + """ + backup synchronization option + in case zeromq failed + """ + while True: + try: + while True: + await asyncio.sleep(10) + if int(time.time()) - self.last_zmq_msg > 300 and self.zmqContext: + self.log.error("ZerroMQ no messages about 5 minutes") + try: + self.zmqContext.destroy() + self.zmqContext = None + except: + pass + self.loop.create_task(self.get_next_block()) + except asyncio.CancelledError: + self.log.info("connector watchdog terminated") + break + except Exception as err: + self.log.error(str(traceback.format_exc())) + self.log.error("watchdog error %s " % err) + + async def get_next_block(self): + if not self.active: + return + if not self.get_next_block_mutex.done(): + await self.get_next_block_mutex + try: + self.get_next_block_mutex = asyncio.Future() + + if self.node_last_block <= self.last_block_height + self.backlog: + d = await self.rpc.getblockcount() + if d == self.node_last_block: + self.log.info("blockchain is synchronized backlog %s" % self.backlog) + return + else: + self.node_last_block = d + d = self.node_last_block - self.last_block_height + + if d > self.deep_sync_limit: + if not self.deep_synchronization: + self.log.warning("Deep synchronization mode") + self.deep_synchronization = True + else: + if self.deep_synchronization: + self.log.warning("Normal synchronization mode") + self.deep_synchronization = False + h = await self.rpc.getblockhash(self.last_block_height + 1) + await self._get_block_by_hash(h) + except Exception as err: + self.log.error("get next block failed %s" % str(err)) + finally: + self.get_next_block_mutex.set_result(True) + + async def _get_block_by_hash(self, hash): + self.log.debug("get block by hash %s" % hash) + try: + block = self.block_hashes_preload.pop(hash) + if not block: + block = await self.rpc.getblock(hash) + self.loop.create_task(self._new_block(block)) + except Exception: + self.log.error("get block by hash %s FAILED" % hash) + + async def _new_block(self, block): + if not self.active or not self.active_block.done() or self.last_block_height >= block["height"]: + return + self.active_block = asyncio.Future() + self.block_dependency_tx = 0 + bin_block_hash = bytes_from_hex(block["hash"]) + bin_prev_block_hash = block["previousblockhash"] if "previousblockhash" in block else None + block_height = int(block["height"]) + self.log.info("New block %s %s" % (block_height, block["hash"])) + bt = tm() + bpt = 0 + try: + # blockchain position check + if self.block_cache.get(bin_block_hash) is not None: + self.log.debug("duplicated block %s" % block["hash"]) + return + if self.block_cache.get(bin_prev_block_hash) is None and self.last_block_height: + self.log.critical("Connector panic! Node out of sync no parent block in chain tail %s" % bin_prev_block_hash) + return + + if self.last_block_height + 1 != block_height: + if self.orphan_handler: + tq = tm() + await self.orphan_handler(self.last_block_height) + self.log.info("orphan handler %s [%s]" % (self.last_block_height, tm(tq))) + self.block_cache.pop_last() + self.last_block_height -= 1 + return + # add all block transactions + + missed = set() + for h in block["tx"]: + if self.tx_cache.get(h) is None: + missed.add(h) + + if self.before_block_handler: + q = time.time() + await self.before_block_handler(block) + bpt = time.time() - q + self.blocks_processing_time += bpt + + self.log.info("Transactions missed %s" % len(missed)) + cq = tm() + if missed: + self.log.debug("Request missed transactions") + self.missed_tx_list = set(missed) + self.await_tx_list = missed + self.await_tx_future = dict() + for i in missed: + self.await_tx_future[i] = asyncio.Future() + self.block_txs_request = asyncio.Future() + if self.deep_synchronization: + self.loop.create_task(self._get_missed(block["hash"], block["time"], block["height"])) + else: + self.loop.create_task(self._get_missed(False, block["time"], block["height"])) + try: + await asyncio.wait_for(self.block_txs_request, timeout=self.block_timeout) + except asyncio.CancelledError: + # refresh rpc connection session + await self.rpc.close() + self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout) + raise RuntimeError("block transaction request timeout") + + tx_count = len(block["tx"]) + self.total_received_tx += tx_count + self.total_received_tx_time += tm(cq) + rate = round(self.total_received_tx/self.total_received_tx_time) + self.log.info("Transactions received: %s [%s] rate tx/s ->> %s <<" % (tx_count, tm(cq), rate)) + + if self.block_handler: + q = time.time() + await self.block_handler(block) + self.blocks_processing_time += time.time() - q + bpt += time.time() - q + # insert new block + self.block_cache.set(block["hash"], block["height"]) + self.last_block_height = block["height"] + + + # after block added handler + if self.after_block_handler: + q = time.time() + try: + await self.after_block_handler(block) + except: + pass + self.blocks_processing_time += time.time() - q + bpt += time.time() - q + self.blocks_processed_count += 1 + + [self.tx_cache.pop(h) for h in block["tx"]] + except Exception as err: + if self.await_tx_list: + self.await_tx_list = set() + self.log.error(str(traceback.format_exc())) + self.log.error("new block error %s" % str(err)) + finally: + self.active_block.set_result(True) + self.log.info("> %s < block [%s tx/ %s size] (dp %s) processing time %s cache [%s/%s]" % + (block["height"], + len(block["tx"]), + block["size"] / 1000000, + self.block_dependency_tx, + tm(bt), + len(self.block_hashes_preload._store), + len(self.block_preload._store))) + if self.node_last_block > self.last_block_height: + self.loop.create_task(self.get_next_block()) + + + async def _get_missed(self, block_hash=False, block_time=None, block_height=None): + if block_hash: + try: + block = self.block_preload.pop(block_hash) + if not block: + t = time.time() + result = await self.rpc.getblock(block_hash, 0) + dt = time.time() - t + t = time.time() + block = decode_block_tx(result) + qt = time.time() - t + self.blocks_download_time += dt + self.blocks_decode_time += qt + + self.log.info("block downloaded %s decoded %s " % (round(dt, 4), round(qt, 4))) + for index, tx in enumerate(block): + try: + self.missed_tx_list.remove(rh2s(block[tx]["txId"])) + self.loop.create_task(self._new_transaction(block[tx], block_time, block_height, index)) + except: + pass + except Exception as err: + self.log.error("_get_missed exception %s " % str(err)) + self.log.error(str(traceback.format_exc())) + self.await_tx_list = set() + self.block_txs_request.cancel() + + elif self.get_missed_tx_threads <= self.get_missed_tx_threads_limit: + self.get_missed_tx_threads += 1 + # start more threads + if len(self.missed_tx_list) > 1: + self.loop.create_task(self._get_missed(False, block_time, block_height)) + while True: + if not self.missed_tx_list: + break + try: + batch = list() + while self.missed_tx_list: + batch.append(["getrawtransaction", self.missed_tx_list.pop()]) + if len(batch) >= self.batch_limit: + break + result = await self.rpc.batch(batch) + for r in result: + try: + tx = Transaction(r["result"], format="raw") + except: + self.log.error("Transaction decode failed: %s" % r["result"]) + raise Exception("Transaction decode failed") + self.loop.create_task(self._new_transaction(tx, block_time, None, None)) + except Exception as err: + self.log.error("_get_missed exception %s " % str(err)) + self.log.error(str(traceback.format_exc())) + self.await_tx_list = set() + self.block_txs_request.cancel() + self.get_missed_tx_threads -= 1 + + + async def _new_transaction(self, tx, block_time = None, block_height = None, block_index = None): + tx_hash = rh2s(tx["txId"]) + if tx_hash in self.tx_in_process or self.tx_cache.get(tx["txId"]): + return + try: + ft = self.await_tx_future if block_height is not None else None + self.tx_in_process.add(tx_hash) + + if self.tx_handler: + await self.tx_handler(tx, ft, block_time, block_height, block_index) + + self.tx_cache.set(tx_hash, True) + try: + self.await_tx_list.remove(tx_hash) + if not self.await_tx_future[tx_hash].done(): + self.await_tx_future[tx_hash].set_result(True) + if not self.await_tx_list: + self.block_txs_request.set_result(True) + except: + pass + except DependsTransaction as err: + self.block_dependency_tx += 1 + self.loop.create_task(self.wait_tx_then_add(err.raw_tx_hash, tx)) + except Exception as err: + if tx_hash in self.await_tx_list: + self.await_tx_list = set() + self.block_txs_request.cancel() + for i in self.await_tx_future: + if not self.await_tx_future[i].done(): + self.await_tx_future[i].cancel() + self.log.debug("new transaction error %s " % err) + self.log.debug(str(traceback.format_exc())) + finally: + self.tx_in_process.remove(tx_hash) + + + + + + async def wait_tx_then_add(self, raw_tx_hash, tx): + tx_hash = rh2s(tx["hash"]) + try: + if not self.await_tx_future[raw_tx_hash].done(): + await self.await_tx_future[raw_tx_hash] + self.loop.create_task(self._new_transaction(tx)) + except: + self.tx_in_process.remove(tx_hash) + + + + + + async def preload_block_hashes(self): + while True: + try: + start_height = self.last_block_height + height = start_height + 10 + d = await self.rpc.getblockcount() + if d > height: + while True: + height += 1 + d = await self.rpc.getblockhash(height) + ex = self.block_preload.get(d) + if not ex: + b = await self.rpc.getblock(d) + self.block_hashes_preload.set(d, b) + if start_height + 15000 < height: + break + except asyncio.CancelledError: + self.log.info("connector preload_block_hashes terminated") + break + except: + pass + await asyncio.sleep(10) + + async def preload_block(self): + while True: + try: + start_height = self.last_block_height + height = start_height + 10 + d = await self.rpc.getblockcount() + if d > height: + while True: + height += 1 + d = await self.rpc.getblockhash(height) + ex = self.block_preload.get(d) + if not ex: + b = await self.rpc.getblock(d, 0) + block = decode_block_tx(b) + self.block_preload.set(d, block) + if start_height + 15000 < height: + break + except asyncio.CancelledError: + self.log.info("connector preload_block terminated") + break + except: + pass + await asyncio.sleep(15) + + + async def stop(self): + self.active = False + self.log.warning("New block processing restricted") + self.log.warning("Stopping node connector ...") + [task.cancel() for task in self.tasks] + await asyncio.wait(self.tasks) + if not self.active_block.done(): + self.log.warning("Waiting active block task ...") + await self.active_block + await self.rpc.close() + if self.zmqContext: + self.zmqContext.destroy() + self.log.warning('Node connector terminated') + + + + + + +def get_stream(stream): + if not isinstance(stream, io.BytesIO): + if isinstance(stream, str): + stream = bytes.fromhex(stream) + if isinstance(stream, bytes): + stream = io.BytesIO(stream) + else: + raise TypeError("object should be bytes or HEX encoded string") + return stream + + +def decode_block_tx(block): + stream = get_stream(block) + stream.seek(80) + return {i: Transaction(stream, format="raw") for i in range(var_int_to_int(read_var_int(stream)))} + + +class DependsTransaction(Exception): + def __init__(self, raw_tx_hash): + self.raw_tx_hash = raw_tx_hash + + +class Cache(): + def __init__(self, max_size=1000): + self._store = OrderedDict() + self._max_size = max_size + self.clear_tail = False + self._requests = 0 + self._hit = 0 + + def set(self, key, value): + self._check_limit() + self._store[key] = value + + def _check_limit(self): + if len(self._store) >= self._max_size: + self.clear_tail = True + if self.clear_tail: + if len(self._store) >= int(self._max_size * 0.75): + [self._store.popitem(last=False) for i in range(20)] + else: + self.clear_tail = False + + def get(self, key): + self._requests += 1 + try: + i = self._store[key] + self._hit += 1 + return i + except: + return None + + def pop(self, key): + self._requests += 1 + try: + data = self._store[key] + del self._store[key] + self._hit += 1 + return data + except: + return None + + def pop_last(self): + try: + i = next(reversed(self._store)) + data = self._store[i] + del self._store[i] + return data + except: + return None + + def len(self): + return len(self._store) + + def hitrate(self): + if self._requests: + return self._hit / self._requests + else: + return 0 + + +def tm(p=None): + if p is not None: + return round(time.time() - p, 4) + return time.time() + diff --git a/pybtc/test/connector/Dockerfile b/pybtc/test/connector/Dockerfile new file mode 100644 index 0000000..a1c4707 --- /dev/null +++ b/pybtc/test/connector/Dockerfile @@ -0,0 +1,20 @@ +FROM ubuntu:18.04 +MAINTAINER Aleksey Karpov +RUN apt-get update +# install python + +RUN apt-get -y install python3 +RUN apt-get -y install python3-pip +RUN apt-get -y install git +RUN apt-get -y install build-essential libtool autotools-dev automake pkg-config libssl-dev libevent-dev + +RUN pip3 install git+https://github.com/bitaps-com/aiojsonrpc +RUN pip3 install colorlog +RUN pip3 install aiohttp +RUN pip3 install pyzmq +RUN pip3 install uvloop + +COPY ./ / +WORKDIR / + +ENTRYPOINT ["/bin/bash"] \ No newline at end of file diff --git a/pybtc/test/connector/build.sh b/pybtc/test/connector/build.sh new file mode 100644 index 0000000..e90c88f --- /dev/null +++ b/pybtc/test/connector/build.sh @@ -0,0 +1 @@ +docker build -t test-connector . \ No newline at end of file diff --git a/pybtc/test/connector/run.sh b/pybtc/test/connector/run.sh new file mode 100644 index 0000000..0c2f674 --- /dev/null +++ b/pybtc/test/connector/run.sh @@ -0,0 +1,8 @@ +docker container kill test-connector +docker container rm test-connector +docker build -t test-connector . +docker run --rm \ + --name connector \ + --net=host \ + -v $(pwd)/../../../:/pybtc \ + -it test-connector \ No newline at end of file diff --git a/pybtc/test/connector/test.conf b/pybtc/test/connector/test.conf new file mode 100644 index 0000000..5c72207 --- /dev/null +++ b/pybtc/test/connector/test.conf @@ -0,0 +1,4 @@ +[BITCOIND] + +zeromq = tcp://127.0.0.1:8900 +rpc = http://bitaps:testnetWfefeefsSWd2@127.0.0.1:8332 diff --git a/pybtc/test/connector/test.py b/pybtc/test/connector/test.py new file mode 100644 index 0000000..2506097 --- /dev/null +++ b/pybtc/test/connector/test.py @@ -0,0 +1,98 @@ +import configparser +import argparse +import asyncio +import sys +import signal +import traceback +import logging +import colorlog +import uvloop +from pybtc.connector import Connector + +asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + + + +class App: + def __init__(self, loop, logger, config): + self.loop = loop + self.log = logger + self.connector = None + self.config = config + self.log.info("Test server init ...") + signal.signal(signal.SIGINT, self.terminate) + signal.signal(signal.SIGTERM, self.terminate) + asyncio.ensure_future(self.start(config), loop=self.loop) + + async def start(self, config): + # init database + try: + zeromq = config['BITCOIND']["zeromq"] + rpc = config['BITCOIND']["rpc"] + + self.connector = Connector(rpc, zeromq, self.log, + before_block_handler = self.before_block_handler) + await self.connector.connected + + except Exception as err: + self.log.error("Start failed") + self.log.error(str(traceback.format_exc())) + + + async def orphan_block_handler(self, orphan_hash): + self.log.warning("handler remove orphan %s" % orphan_hash) + + async def before_block_handler(self, data): + self.log.warning("handler new block %s" % str(data["hash"])) + + + async def new_transaction_handler(self, data, ft, a,b,c): + assert data["rawTx"] == data.serialize(hex=False) + + + def _exc(self, a, b, c): + return + + def terminate(self, a, b): + self.loop.create_task(self.terminate_coroutine()) + + async def terminate_coroutine(self): + sys.excepthook = self._exc + self.log.error('Stop request received') + if self.connector: + self.log.warning("Stop node connector") + await self.connector.stop() + self.log.info("Test server stopped") + self.loop.stop() + + +def init(loop, argv): + parser = argparse.ArgumentParser(description="Test node connector ...") + args = parser.parse_args() + config_file = "test.conf" + log_level = logging.INFO + logger = colorlog.getLogger('cn') + config = configparser.ConfigParser() + config.read(config_file) + + ch = logging.StreamHandler() + ch.setLevel(log_level) + formatter = colorlog.ColoredFormatter('%(log_color)s%(asctime)s %(levelname)s: %(message)s (%(module)s:%(lineno)d)') + ch.setFormatter(formatter) + logger.addHandler(ch) + logger.setLevel(log_level) + logger.info("Start") + loop = asyncio.get_event_loop() + app = App(loop, logger, config) + return app + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + app = init(loop, sys.argv[1:]) + loop.run_forever() + pending = asyncio.Task.all_tasks() + loop.run_until_complete(asyncio.gather(*pending)) + loop.close() + +