From bfd10addba8ac06a0dbc38386bdec1db6e30f5a3 Mon Sep 17 00:00:00 2001 From: tripathyr Date: Sun, 1 Dec 2024 10:45:18 +0530 Subject: [PATCH] Create ranchimallflo_api.py --- ranchimallflo_api.py | 4458 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 4458 insertions(+) create mode 100644 ranchimallflo_api.py diff --git a/ranchimallflo_api.py b/ranchimallflo_api.py new file mode 100644 index 0000000..688cb0f --- /dev/null +++ b/ranchimallflo_api.py @@ -0,0 +1,4458 @@ +from collections import defaultdict +import json +import os +import requests +import sys +import time +from datetime import datetime +from quart import jsonify, make_response, Quart, render_template, request, flash, redirect, url_for, send_file +from quart_cors import cors +import asyncio +from typing import Optional +from config import * +import parsing +import subprocess +import pyflo +from operator import itemgetter +import pdb +import ast +import time + + + +#MYSQL ENHANCEMENTS START +import configparser +import aiohttp +import aiomysql +import asyncio +import atexit +import pymysql +from dbutils.pooled_db import PooledDB +from apscheduler.schedulers.background import BackgroundScheduler +import logging + + +# Configuration Setup +config = configparser.ConfigParser() +config.read('config.ini') + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +app = Quart(__name__) +app.clients = set() +app = cors(app, allow_origin="*") + +API_TIMEOUT = int(config['API']['API_TIMEOUT']) # 1 second +RETRY_TIMEOUT_LONG = int(config['API']['RETRY_TIMEOUT_LONG']) # 30 minutes +RETRY_TIMEOUT_SHORT = int(config['API']['RETRY_TIMEOUT_SHORT']) # 1 minute +DB_RETRY_TIMEOUT = int(config['API']['DB_RETRY_TIMEOUT']) # 1 minute + + + + +# Global connection pools +sync_connection_pool = None +async_connection_pool = None + +class MySQLConfig: + def __init__(self): + self.username = config['MYSQL'].get('USERNAME', 'default_user') + self.password = config['MYSQL'].get('PASSWORD', 'default_password') + self.host = config['MYSQL'].get('HOST', 'localhost') + self.database_prefix = config['MYSQL'].get('DATABASE_PREFIX', 'rm') + +mysql_config = MySQLConfig() +net = config['DEFAULT'].get('NET', 'mainnet') # Default to 'mainnet' if not set + +# Initialize connection pools +async def initialize_connection_pools(): + global sync_connection_pool, async_connection_pool + + try: + # Sync connection pool + sync_connection_pool = PooledDB( + creator=pymysql, + maxconnections=10, + mincached=2, + maxcached=5, + blocking=True, + host=mysql_config.host, + user=mysql_config.username, + password=mysql_config.password, + charset="utf8mb4", + ) + + # Async connection pool + async_connection_pool = await aiomysql.create_pool( + host=mysql_config.host, + user=mysql_config.username, + password=mysql_config.password, + db=None, + charset="utf8mb4", + maxsize=10, + minsize=2, + loop=asyncio.get_event_loop(), + ) + print("Connection pools initialized successfully.") + + except Exception as e: + print(f"Error initializing connection pools: {e}") + raise RuntimeError("Failed to initialize database connection pools") + +@app.before_serving +async def before_serving(): + await initialize_connection_pools() + print("Connection pools initialized before serving requests.") + +@app.after_serving +async def after_serving(): + if async_connection_pool: + async_connection_pool.close() + await async_connection_pool.wait_closed() + print("Async connection pool closed.") + + +async def get_mysql_connection(db_name, no_standardize=False, USE_ASYNC=False): + logger.info("ABC: Entering get_mysql_connection function") + conn = None # Initialize conn to None to avoid "referenced before assignment" + logger.info("ABC: Initialized conn to None") + + db_name = standardize_db_name(db_name) if not no_standardize else db_name + logger.info(f"ABC: Database name standardized to: {db_name}") + + try: + logger.info("ABC: Checking USE_ASYNC flag") + if USE_ASYNC: + logger.info("ABC: Async mode enabled") + if not async_connection_pool: + logger.error("ABC: Async connection pool not initialized") + raise RuntimeError("Async connection pool not initialized") + + logger.info("ABC: Acquiring async connection") + conn = await async_connection_pool.acquire() # Acquire connection + if conn is None: + logger.error("ABC: Failed to acquire a valid connection") + raise RuntimeError("Failed to acquire a valid async connection") + logger.info("ABC: Async connection acquired successfully") + + async with conn.cursor() as cursor: + logger.info("ABC: Creating async cursor and selecting database") + await cursor.execute(f"USE `{db_name}`") # Use escaped database name + logger.info(f"ABC: Database `{db_name}` selected successfully in async mode") + return conn # Return connection for further use + else: + logger.info("ABC: Sync mode enabled") + if not sync_connection_pool: + logger.error("ABC: Sync connection pool not initialized") + raise RuntimeError("Sync connection pool not initialized") + + logger.info("ABC: Acquiring sync connection") + conn = sync_connection_pool.connection() # Acquire sync connection + if conn is None: + logger.error("ABC: Failed to acquire a valid connection") + raise RuntimeError("Failed to acquire a valid sync connection") + logger.info("ABC: Sync connection acquired successfully") + + conn.select_db(db_name) # Select database + logger.info(f"ABC: Database `{db_name}` selected successfully in sync mode") + return conn # Return connection for further use + except Exception as e: + logger.error(f"ABC: Error in database connection: {e}") + raise + + + + + + + + + + + + +def is_backend_ready(): + """ + Dummy function that always indicates the backend is ready. + """ + return True + + +#MYSQL ENHANCEMENTS END + + + + +INTERNAL_ERROR = "Unable to process request, try again later" +BACKEND_NOT_READY_ERROR = "Server is still syncing, try again later!" +BACKEND_NOT_READY_WARNING = "Server is still syncing, data may not be final" + +# Global values and configg +internalTransactionTypes = [ 'tokenswapDepositSettlement', 'tokenswapParticipationSettlement', 'smartContractDepositReturn'] + +if net == 'mainnet': + is_testnet = False +elif net == 'testnet': + is_testnet = True + +# Validation functionss +def check_flo_address(floaddress, is_testnet=False): + return pyflo.is_address_valid(floaddress, testnet=is_testnet) + +def check_integer(value): + return str.isdigit(value) + +""" ??? NOT USED??? +# Helper functions +def retryRequest(tempserverlist, apicall): + if len(tempserverlist) != 0: + try: + response = requests.get('{}api/{}'.format(tempserverlist[0], apicall)) + except: + tempserverlist.pop(0) + return retryRequest(tempserverlist, apicall) + else: + if response.status_code == 200: + return json.loads(response.content) + else: + tempserverlist.pop(0) + return retryRequest(tempserverlist, apicall) + else: + print("None of the APIs are responding for the call {}".format(apicall)) + sys.exit(0) + + +def multiRequest(apicall, net): + testserverlist = ['http://0.0.0.0:9000/', 'https://testnet.flocha.in/', 'https://testnet-flosight.duckdns.org/'] + mainserverlist = ['http://0.0.0.0:9001/', 'https://livenet.flocha.in/', 'https://testnet-flosight.duckdns.org/'] + if net == 'mainnet': + return retryRequest(mainserverlist, apicall) + elif net == 'testnet': + return retryRequest(testserverlist, apicall) +""" + + +async def blockdetailhelper(blockdetail): + # Determine whether the input is blockHash or blockHeight + if blockdetail.isdigit(): + blockHash = None + blockHeight = int(blockdetail) + else: + blockHash = str(blockdetail) + blockHeight = None + + # Get the database connection for the "latestCache" DB asynchronously + conn = await get_mysql_connection("latestCache", USE_ASYNC=True) # Use the async connection pool + + try: + async with conn.cursor() as cursor: + # Query the database based on blockHash or blockHeight + if blockHash: + query = "SELECT jsonData FROM latestBlocks WHERE blockHash = %s" + await cursor.execute(query, (blockHash,)) + elif blockHeight: + query = "SELECT jsonData FROM latestBlocks WHERE blockNumber = %s" + await cursor.execute(query, (blockHeight,)) + else: + raise ValueError("Invalid blockdetail input. Must be blockHash or blockHeight.") + + # Fetch the result asynchronously + result = await cursor.fetchall() + + except aiomysql.MySQLError as e: + print(f"Error querying database: {e}") + result = [] + + finally: + # Release the connection back to the pool + await conn.commit() + + return result + + +async def transactiondetailhelper(transactionHash): + # Get the database connection for the "latestCache" DB asynchronously + conn = await get_mysql_connection("latestCache", USE_ASYNC=True) # Use the async connection pool + + try: + async with conn.cursor() as cursor: + # Query the database for the transaction hash + query = """ + SELECT jsonData, parsedFloData, transactionType, db_reference + FROM latestTransactions + WHERE transactionHash = %s + """ + await cursor.execute(query, (transactionHash,)) + + # Fetch the result asynchronously + transactionJsonData = await cursor.fetchall() + + except aiomysql.MySQLError as e: + print(f"Error querying database: {e}") + transactionJsonData = [] + + finally: + # Release the connection back to the pool + await conn.commit() + + return transactionJsonData + +#ATTEMPT 1 +# async def update_transaction_confirmations(transactionJson): +# url = f"{apiUrl}api/v1/tx/{transactionJson['txid']}" +# try: +# async with aiohttp.ClientSession() as session: +# async with session.get(url) as response: +# if response.status == 200: +# response_data = await response.json() +# transactionJson['confirmations'] = response_data['confirmations'] +# except Exception as e: +# print(f"Error fetching transaction confirmation: {e}") +# return transactionJson + +# ATTEMPT 2 +# async def update_transaction_confirmations(transactionJson): +# try: +# # Simulate the response without making an actual API call as it is slowing down +# transactionJson['confirmations'] = transactionJson['confirmations'] +# logger.info(f"Mock confirmation set for transaction {transactionJson['txid']}: {transactionJson['confirmations']} confirmations") +# except Exception as e: +# print(f"Error updating transaction confirmation: {e}") +# return transactionJson + +#ATTEMPT 3 +# async def update_transaction_confirmations(transactionJson): +# url = f"{apiUrl}api/v1/tx/{transactionJson['txid']}" +# try: +# timeout = aiohttp.ClientTimeout(total=API_TIMEOUT) +# async with aiohttp.ClientSession(timeout=timeout) as session: +# async with session.get(url) as response: +# if response.status == 200: +# response_data = await response.json() +# transactionJson['confirmations'] = response_data['confirmations'] +# else: +# print(f"API error: {response.status}") +# except asyncio.TimeoutError: +# print(f"Request timed out after {API_TIMEOUT} seconds") +# except Exception as e: +# print(f"Error fetching transaction confirmation: {e}") +# return transactionJson + + +async def update_transaction_confirmations(transactionJson): + if transactionJson.get('confirmations', 0) >= 100: + return transactionJson + + url = f"{apiUrl}api/v1/tx/{transactionJson['txid']}" + try: + timeout = aiohttp.ClientTimeout(total=API_TIMEOUT) + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(url) as response: + if response.status == 200: + response_data = await response.json() + transactionJson['confirmations'] = response_data['confirmations'] + else: + print(f"API error: {response.status}") + except asyncio.TimeoutError: + print(f"Request timed out after {API_TIMEOUT} seconds") + except Exception as e: + print(f"Error fetching transaction confirmation: {e}") + return transactionJson + + + +async def smartcontract_morph_helper(smart_contracts): + contractList = [] + for idx, contract in enumerate(smart_contracts): + contractDict = {} + contractDict['contractName'] = contract[1] + contractDict['contractAddress'] = contract[2] + contractDict['status'] = contract[3] + contractDict['contractType'] = contract[5] + + if contractDict['contractType'] in ['continuous-event', 'continuos-event']: + contractDict['contractSubType'] = 'tokenswap' + accepting_selling_tokens = ast.literal_eval(contract[4]) + contractDict['acceptingToken'] = accepting_selling_tokens[0] + contractDict['sellingToken'] = accepting_selling_tokens[1] + + # Awaiting async calls + contractStructure = await fetchContractStructure(contractDict['contractName'], contractDict['contractAddress']) + if contractStructure['pricetype'] == 'dynamic': + # temp fix + if 'oracle_address' in contractStructure.keys(): + contractDict['oracle_address'] = contractStructure['oracle_address'] + contractDict['price'] = await fetch_dynamic_swap_price(contractStructure, {'time': datetime.now().timestamp()}) + else: + contractDict['price'] = contractStructure['price'] + + elif contractDict['contractType'] == 'one-time-event': + contractDict['tokenIdentification'] = contract[4] + # Awaiting async call + contractStructure = await fetchContractStructure(contractDict['contractName'], contractDict['contractAddress']) + + if 'payeeAddress' in contractStructure.keys(): + contractDict['contractSubType'] = 'time-trigger' + else: + choice_list = [] + for obj_key in contractStructure['exitconditions'].keys(): + choice_list.append(contractStructure['exitconditions'][obj_key]) + contractDict['userChoices'] = choice_list + contractDict['contractSubType'] = 'external-trigger' + contractDict['expiryDate'] = contract[9] + + contractDict['closeDate'] = contract[10] + + contractDict['transactionHash'] = contract[6] + contractDict['blockNumber'] = contract[7] + contractDict['incorporationDate'] = contract[8] + contractList.append(contractDict) + + return contractList + + +def return_smart_contracts(connection, contractName=None, contractAddress=None): + + cursor = connection.cursor() + query = """ + SELECT * FROM activecontracts + WHERE id IN ( + SELECT MAX(id) FROM activecontracts GROUP BY contractName, contractAddress + ) + """ + conditions = [] + params = [] + + if contractName: + conditions.append("contractName = %s") + params.append(contractName) + if contractAddress: + conditions.append("contractAddress = %s") + params.append(contractAddress) + + if conditions: + query += " AND " + " AND ".join(conditions) + + try: + cursor.execute(query, params) + smart_contracts = cursor.fetchall() + except Exception as e: + print(f"Error fetching smart contracts: {e}") + smart_contracts = [] + finally: + cursor.close() + + return smart_contracts + + +async def fetchContractStructure(contractName, contractAddress): + """ + Fetches the structure of a smart contract from the MySQL database. + + Args: + contractName (str): The name of the contract. + contractAddress (str): The address of the contract. + + Returns: + dict: The contract structure if found, or 0 if the database does not exist. + """ + # Construct the database name dynamically + db_name = f"{contractName.strip()}_{contractAddress.strip()}" + + # Get the database connection asynchronously + conn = await get_mysql_connection(db_name, USE_ASYNC=True) # Use async connection pool + + try: + # Use async cursor + async with conn.cursor() as cursor: + # Fetch contract structure from the database asynchronously + await cursor.execute('SELECT attribute, value FROM contractstructure') + result = await cursor.fetchall() + + contractStructure = {} + conditionDict = {} + counter = 0 + + for item in result: + attribute, value = item + if attribute == 'exitconditions': + conditionDict[counter] = value + counter += 1 + else: + contractStructure[attribute] = value + + if conditionDict: + contractStructure['exitconditions'] = conditionDict + + # Convert specific fields to appropriate data types + if 'contractAmount' in contractStructure: + contractStructure['contractAmount'] = float(contractStructure['contractAmount']) + if 'payeeAddress' in contractStructure: + contractStructure['payeeAddress'] = json.loads(contractStructure['payeeAddress']) + if 'maximumsubscriptionamount' in contractStructure: + contractStructure['maximumsubscriptionamount'] = float(contractStructure['maximumsubscriptionamount']) + if 'minimumsubscriptionamount' in contractStructure: + contractStructure['minimumsubscriptionamount'] = float(contractStructure['minimumsubscriptionamount']) + if 'price' in contractStructure: + contractStructure['price'] = float(contractStructure['price']) + + return contractStructure + + except aiomysql.MySQLError as e: + print(f"Database error while fetching contract structure: {e}") + return 0 + except Exception as e: + print(f"Unexpected error: {e}") + return 0 + finally: + # Release the connection back to the pool (commit if necessary) + await conn.commit() + +async def fetchContractStatus(contractName, contractAddress): + try: + # Get the system database connection asynchronously + conn = await get_mysql_connection('system', USE_ASYNC=True) # Using async connection pool + + async with conn.cursor() as cursor: + # Query to fetch the contract status + query = """ + SELECT status + FROM activecontracts + WHERE contractName = %s AND contractAddress = %s + ORDER BY id DESC + LIMIT 1 + """ + await cursor.execute(query, (contractName, contractAddress)) + status = await cursor.fetchone() + + # Return the status if found, otherwise None + if status: + return status[0] + else: + return None + + except aiomysql.MySQLError as e: + print(f"Database error while fetching contract status: {e}") + return None + except Exception as e: + print(f"Unexpected error: {e}") + return None + finally: + # Ensure the connection is released back to the pool + await conn.commit() # Commit if needed (to handle connection lifecycle) + + + +def extract_ip_op_addresses(transactionJson): + sender_address = transactionJson['vin'][0]['addresses'][0] + receiver_address = None + for utxo in transactionJson['vout']: + if utxo['scriptPubKey']['addresses'][0] == sender_address: + continue + receiver_address = utxo['scriptPubKey']['addresses'][0] + return sender_address, receiver_address + + +async def updatePrices(): + """ + Updates the latest price data for various currency pairs in the MySQL database asynchronously. + """ + prices = {} + + # USD -> INR + try: + async with aiohttp.ClientSession() as session: + async with session.get("https://api.exchangerate-api.com/v4/latest/usd", timeout=10) as response: + if response.status == 200: + price = await response.json() + prices['USDINR'] = price['rates']['INR'] + except Exception as e: + print(f"Error fetching USD to INR exchange rate: {e}") + + # Blockchain stuff: BTC, FLO -> USD, INR + # BTC -> USD | BTC -> INR + try: + async with aiohttp.ClientSession() as session: + async with session.get("https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,flo&vs_currencies=usd,inr", timeout=10) as response: + if response.status == 200: + price = await response.json() + prices['BTCUSD'] = price['bitcoin']['usd'] + prices['BTCINR'] = price['bitcoin']['inr'] + except Exception as e: + print(f"Error fetching BTC prices: {e}") + + # FLO -> USD | FLO -> INR + try: + async with aiohttp.ClientSession() as session: + async with session.get("https://api.coinlore.net/api/ticker/?id=67", timeout=10) as response: + if response.status == 200: + price = await response.json() + prices["FLOUSD"] = float(price[0]['price_usd']) + if 'USDINR' in prices: + prices["FLOINR"] = float(prices["FLOUSD"]) * float(prices['USDINR']) + except Exception as e: + print(f"Error fetching FLO prices: {e}") + + # Log the updated prices + print('Prices updated at time: %s' % datetime.now()) + print(prices) + + # Update prices in the database asynchronously + try: + # Use async MySQL connection + conn = await get_mysql_connection('system', USE_ASYNC=True) + + async with conn.cursor() as cursor: + # Update each rate pair in the database asynchronously + for pair, price in prices.items(): + await cursor.execute( + "UPDATE ratepairs SET price = %s WHERE ratepair = %s", + (price, pair) + ) + + await conn.commit() + print("Prices successfully updated in the database.") + + except aiomysql.MySQLError as e: + print(f"Database error while updating prices: {e}") + except Exception as e: + print(f"Unexpected error: {e}") + + +async def fetch_dynamic_swap_price(contractStructure, blockinfo): + oracle_address = contractStructure['oracle_address'] + print(f'Oracle address is: {oracle_address}') + + async def send_api_request(url): + timeout = aiohttp.ClientTimeout(total=API_TIMEOUT) + try: + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(url) as response: + if response.status == 200: + return await response.json() + else: + print(f'API error: {response.status}') + return None + except asyncio.TimeoutError: + print(f"Request timed out after {API_TIMEOUT} seconds") + return None + except Exception as e: + print(f"Error during API request: {e}") + return None + + try: + # Fetch transactions associated with the oracle address + url = f'{apiUrl}api/v1/addr/{oracle_address}' + response_data = await send_api_request(url) + if response_data is None: + return None + + if 'transactions' not in response_data: + return float(contractStructure['price']) + + transactions = response_data['transactions'] + for transaction_hash in transactions: + transaction_url = f'{apiUrl}api/v1/tx/{transaction_hash}' + transaction_response = await send_api_request(transaction_url) + if transaction_response is None: + continue + + transaction = transaction_response + floData = transaction.get('floData', None) + + if not floData or transaction['time'] >= blockinfo['time']: + continue + + try: + sender_address, receiver_address = find_sender_receiver(transaction) + assert receiver_address == contractStructure['contractAddress'] + assert sender_address == oracle_address + + floData = json.loads(floData) + assert floData['price-update']['contract-name'] == contractStructure['contractName'] + assert floData['price-update']['contract-address'] == contractStructure['contractAddress'] + + return float(floData['price-update']['price']) + except Exception as e: + print(f"Error processing transaction: {e}") + continue + + # If no matching transaction is found, return the default price + return float(contractStructure['price']) + except Exception as e: + print(f"Error in fetch_dynamic_swap_price: {e}") + return None + + + +def find_sender_receiver(transaction_data): + # Create vinlist and outputlist + vinlist = [] + querylist = [] + + #totalinputval = 0 + #inputadd = '' + + # todo Rule 40 - For each vin, find the feeding address and the fed value. Make an inputlist containing [inputaddress, n value] + for vin in transaction_data["vin"]: + vinlist.append([vin["addr"], float(vin["value"])]) + + totalinputval = float(transaction_data["valueIn"]) + + # todo Rule 41 - Check if all the addresses in a transaction on the input side are the same + for idx, item in enumerate(vinlist): + if idx == 0: + temp = item[0] + continue + if item[0] != temp: + print(f"System has found more than one address as part of vin. Transaction {transaction_data['txid']} is rejected") + return 0 + + inputlist = [vinlist[0][0], totalinputval] + inputadd = vinlist[0][0] + + # todo Rule 42 - If the number of vout is more than 2, reject the transaction + if len(transaction_data["vout"]) > 2: + print(f"System has found more than 2 address as part of vout. Transaction {transaction_data['txid']} is rejected") + return 0 + + # todo Rule 43 - A transaction accepted by the system has two vouts, 1. The FLO address of the receiver + # 2. Flo address of the sender as change address. If the vout address is change address, then the other adddress + # is the recevier address + + outputlist = [] + addresscounter = 0 + inputcounter = 0 + for obj in transaction_data["vout"]: + if obj["scriptPubKey"]["type"] == "pubkeyhash": + addresscounter = addresscounter + 1 + if inputlist[0] == obj["scriptPubKey"]["addresses"][0]: + inputcounter = inputcounter + 1 + continue + outputlist.append([obj["scriptPubKey"]["addresses"][0], obj["value"]]) + + if addresscounter == inputcounter: + outputlist = [inputlist[0]] + elif len(outputlist) != 1: + print(f"Transaction's change is not coming back to the input address. Transaction {transaction_data['txid']} is rejected") + return 0 + else: + outputlist = outputlist[0] + + return inputlist[0], outputlist[0] + +async def fetch_contract_status_time_info(contractName, contractAddress): + try: + # Connect to the system database asynchronously + conn = await get_mysql_connection('system', USE_ASYNC=True) # Use async connection pool + + async with conn.cursor() as cursor: + # Query to fetch contract status and time info + query = """ + SELECT status, incorporationDate, expiryDate, closeDate + FROM activecontracts + WHERE contractName = %s AND contractAddress = %s + ORDER BY id DESC + LIMIT 1 + """ + await cursor.execute(query, (contractName, contractAddress)) + contract_status_time_info = await cursor.fetchone() + + # Return the result or an empty list if no data is found + return contract_status_time_info if contract_status_time_info else [] + + except aiomysql.MySQLError as e: + print(f"Database error while fetching contract status and time info: {e}") + return [] + except Exception as e: + print(f"Unexpected error: {e}") + return [] + finally: + # Ensure that the connection is properly released + await conn.commit() + + + +def checkIF_commitee_trigger_tranasaction(transactionDetails): + if transactionDetails[3] == 'trigger': + pass + +async def transaction_post_processing(transactionJsonData): + rowarray_list = [] + + for row in transactionJsonData: + transactions_object = {} + parsedFloData = json.loads(row[1]) + transactionDetails = json.loads(row[0]) + + if row[3] in internalTransactionTypes or (row[3] == 'trigger' and row[8] != 'committee'): + internal_info = { + 'senderAddress': row[4], + 'receiverAddress': row[5], + 'tokenAmount': row[6], + 'tokenIdentification': row[7], + 'contractName': parsedFloData['contractName'], + 'transactionTrigger': transactionDetails['txid'], + 'time': transactionDetails['time'], + 'type': row[3], + 'onChain': False + } + transactions_object = internal_info + else: + transactions_object = {**parsedFloData, **transactionDetails} + # Awaiting the asynchronous update_transaction_confirmations function + transactions_object = await update_transaction_confirmations(transactions_object) + transactions_object['onChain'] = True + + rowarray_list.append(transactions_object) + + return rowarray_list + + + +def standardize_db_name(db_name): + """ + Ensures the database name has the proper prefix and suffix. + + Args: + db_name (str): The logical database name. + + Returns: + str: The standardized database name. + """ + if not (db_name.startswith(f"{mysql_config.database_prefix}_") and db_name.endswith("_db")): + db_name = f"{mysql_config.database_prefix}_{db_name}_db" + return db_name + + + + +async def fetch_transactions_from_token(token_name, floAddress, limit=None): + """ + Fetch transactions for a specific token and FLO address. + + Args: + token_name (str): The name of the token. + floAddress (str): The FLO address. + limit (int, optional): Maximum number of transactions to fetch. + + Returns: + list: List of transactions for the token. + """ + token_db_name = standardize_db_name(token_name) + + try: + # Acquire a connection using `async with` to manage cleanup + async with await get_mysql_connection(token_db_name, USE_ASYNC=True) as conn: + # Create a cursor using `async with` for cleanup + async with conn.cursor(dictionary=True) as cursor: + # Query to fetch transactions + query = """ + SELECT jsonData, parsedFloData, time, transactionType, sourceFloAddress, destFloAddress, + transferAmount, %s AS token, '' AS transactionSubType + FROM transactionHistory + WHERE sourceFloAddress = %s OR destFloAddress = %s + """ + parameters = [token_name, floAddress, floAddress] + + # Add LIMIT clause if provided + if limit is not None: + query += " LIMIT %s" + parameters.append(limit) + + # Execute the query and fetch all results + await cursor.execute(query, parameters) + return await cursor.fetchall() + + except Exception as e: + # Log the error and re-raise it + logger.error(f"Error in fetch_transactions_from_token: {e}", exc_info=True) + raise + + +async def fetch_token_transactions(tokens, senderFloAddress=None, destFloAddress=None, limit=None, use_and=False): + """ + Fetch transactions for multiple tokens (or a single token). + + Args: + tokens (list or str): List of token names or a single token name. + senderFloAddress (str, optional): Sender FLO address. + destFloAddress (str, optional): Destination FLO address. + limit (int, optional): Maximum number of transactions. + use_and (bool, optional): Use AND or OR for filtering. + + Returns: + list: Combined list of transactions for all tokens. + """ + # Automatically convert a single token name into a list + if isinstance(tokens, str): + tokens = [tokens] + + if not tokens or not isinstance(tokens, list): + return jsonify(description="Invalid or missing tokens"), 400 + + try: + # Fetch transactions in parallel for all tokens + tasks = [ + fetch_transactions_from_token( + token, + senderFloAddress or destFloAddress, # Use either sender or destination address + limit + ) + for token in tokens + ] + results = await asyncio.gather(*tasks) + + # Combine and process results + all_transactions = [] + for result in results: + all_transactions.extend(result) + + # Post-process and return transactions + return await transaction_post_processing(all_transactions) + + except Exception as e: + print(f"Unexpected error while fetching token transactions: {e}") + return jsonify(description="Unexpected error occurred"), 500 + + + + +async def fetch_contract_transactions(contractName, contractAddress, _from=0, to=100, USE_ASYNC=False): + """ + Fetches transactions related to a smart contract and associated tokens asynchronously. + + Args: + contractName (str): Name of the smart contract. + contractAddress (str): Address of the smart contract. + _from (int, optional): Starting index for transactions. Defaults to 0. + to (int, optional): Ending index for transactions. Defaults to 100. + USE_ASYNC (bool, optional): Flag to use async connection. + + Returns: + list: Processed transactions. + """ + try: + # Standardize smart contract database name + sc_db_name = standardize_db_name(f"{contractName}_{contractAddress}") + + # Fetch contract structure asynchronously + contractStructure = await fetchContractStructure(contractName, contractAddress) + if not contractStructure: + return jsonify(description="Invalid contract structure"), 404 + + transactionJsonData = [] + creation_tx_query = """ + SELECT jsonData, parsedFloData, time, transactionType, sourceFloAddress, destFloAddress, + transferAmount, '' AS token, transactionSubType + FROM contractTransactionHistory + ORDER BY id + LIMIT 1; + """ + + # Open the connection asynchronously (using the appropriate connection pool) + conn_sc = await get_mysql_connection(sc_db_name, USE_ASYNC=USE_ASYNC) + async with conn_sc.cursor() as cursor_sc: + # Fetch creation transaction asynchronously + await cursor_sc.execute(creation_tx_query) + creation_tx = await cursor_sc.fetchall() + transactionJsonData = creation_tx + + # Fetch token transactions concurrently for continuous or one-time event contracts + if contractStructure['contractType'] == 'continuos-event': + token1 = contractStructure['accepting_token'] + token2 = contractStructure['selling_token'] + + # Fetch transactions for token1 and token2 concurrently + transaction_results_token1, transaction_results_token2 = await asyncio.gather( + fetch_token_transactions_for_contract(token1, sc_db_name, _from, to, USE_ASYNC), + fetch_token_transactions_for_contract(token2, sc_db_name, _from, to, USE_ASYNC) + ) + + # Combine results for token1 and token2 + transactionJsonData += transaction_results_token1 + transaction_results_token2 + + elif contractStructure['contractType'] == 'one-time-event': + token1 = contractStructure['tokenIdentification'] + + # Fetch transactions for one-time event contract + result = await fetch_token_transactions_for_contract(token1, sc_db_name, _from, to, USE_ASYNC) + transactionJsonData += result + + # Post-process and return transactions + return await transaction_post_processing(transactionJsonData) + + except aiomysql.MySQLError as e: + print(f"Database error while fetching contract transactions: {e}") + return jsonify(description="Database error"), 500 + except Exception as e: + print(f"Unexpected error: {e}") + return jsonify(description="Unexpected error occurred"), 500 + + +async def fetch_token_transactions_for_contract(token_name, sc_db_name, _from, to, USE_ASYNC=False): + """ + Fetches transactions for a specific token and smart contract database asynchronously. + + Args: + token_name (str): The name of the token. + sc_db_name (str): The name of the smart contract database. + _from (int): Starting index for transactions. + to (int): Ending index for transactions. + USE_ASYNC (bool, optional): Flag to use async connection. + + Returns: + list: List of transactions for the token. + """ + token_db_name = standardize_db_name(token_name) + try: + # Open separate connections for sc_db_name and token_db_name + conn_token = await get_mysql_connection(token_db_name, USE_ASYNC=USE_ASYNC) + conn_sc = await get_mysql_connection(sc_db_name, USE_ASYNC=USE_ASYNC) + + # Fetch data from both connections and perform the join in-memory + async with conn_sc.cursor(dictionary=True) as cursor_sc, conn_token.cursor(dictionary=True) as cursor_token: + # Fetch data from contractTransactionHistory (sc_db_name) + query_sc = """ + SELECT transactionHash, transactionSubType, id + FROM contractTransactionHistory + WHERE id BETWEEN %s AND %s + """ + await cursor_sc.execute(query_sc, (_from, to)) + sc_transactions = await cursor_sc.fetchall() + + # Fetch data from transactionHistory (token_db_name) + transaction_hashes = tuple(tx['transactionHash'] for tx in sc_transactions) + query_token = f""" + SELECT jsonData, parsedFloData, time, transactionType, sourceFloAddress, destFloAddress, + transferAmount, '{token_name}' AS token + FROM transactionHistory + WHERE transactionHash IN %s + """ + await cursor_token.execute(query_token, (transaction_hashes,)) + token_transactions = await cursor_token.fetchall() + + # Combine results from both queries + combined_transactions = [] + for sc_tx in sc_transactions: + matching_tx = next((tx for tx in token_transactions if tx['transactionHash'] == sc_tx['transactionHash']), None) + if matching_tx: + combined_tx = {**matching_tx, 'transactionSubType': sc_tx['transactionSubType']} + combined_transactions.append(combined_tx) + + return combined_transactions + + except aiomysql.MySQLError as e: + print(f"Error fetching transactions for token {token_name}: {e}") + return [] + + +async def fetch_swap_contract_transactions(contractName, contractAddress, transactionHash=None, USE_ASYNC=False): + """ + Fetches swap contract transactions involving two tokens asynchronously. + + Args: + contractName (str): Name of the swap contract. + contractAddress (str): Address of the swap contract. + transactionHash (str, optional): Specific transaction hash to filter transactions. + USE_ASYNC (bool, optional): Whether to use async connection for database interactions. + + Returns: + list: Processed transactions. + """ + try: + # Standardize smart contract database name + sc_db_name = standardize_db_name(f"{contractName}_{contractAddress}") + + # Fetch contract structure + contractStructure = await fetchContractStructure(contractName, contractAddress) + if not contractStructure: + return jsonify(description="Invalid contract structure"), 404 + + # Get token names + token1 = contractStructure['accepting_token'] + token2 = contractStructure['selling_token'] + + # Fetch contract transactions from contractTransactionHistory + contract_transactions_query = """ + SELECT transactionHash, transactionSubType + FROM contractTransactionHistory + """ + if transactionHash: + contract_transactions_query += " WHERE transactionHash = %s" + + # Open connection to smart contract database asynchronously + async with await get_mysql_connection(sc_db_name, USE_ASYNC=USE_ASYNC) as conn_sc: + async with conn_sc.cursor(dictionary=True) as cursor_sc: + # Execute contract transactions query + if transactionHash: + await cursor_sc.execute(contract_transactions_query, (transactionHash,)) + else: + await cursor_sc.execute(contract_transactions_query) + + contract_transactions = await cursor_sc.fetchall() + transaction_hashes = [tx["transactionHash"] for tx in contract_transactions] + + # Open connections to token databases concurrently + async with await get_mysql_connection(standardize_db_name(token1), USE_ASYNC=USE_ASYNC) as conn_token1, \ + await get_mysql_connection(standardize_db_name(token2), USE_ASYNC=USE_ASYNC) as conn_token2: + + # Fetch transactions for token1 and token2 concurrently + token1_query = f""" + SELECT jsonData, parsedFloData, time, transactionType, sourceFloAddress, destFloAddress, + transferAmount, '{token1}' AS token + FROM transactionHistory + WHERE transactionHash IN ({','.join(['%s'] * len(transaction_hashes))}) + """ + token2_query = f""" + SELECT jsonData, parsedFloData, time, transactionType, sourceFloAddress, destFloAddress, + transferAmount, '{token2}' AS token + FROM transactionHistory + WHERE transactionHash IN ({','.join(['%s'] * len(transaction_hashes))}) + """ + + async with conn_token1.cursor(dictionary=True) as cursor_token1, \ + conn_token2.cursor(dictionary=True) as cursor_token2: + await asyncio.gather( + cursor_token1.execute(token1_query, transaction_hashes), + cursor_token2.execute(token2_query, transaction_hashes) + ) + + token1_transactions, token2_transactions = await asyncio.gather( + cursor_token1.fetchall(), + cursor_token2.fetchall() + ) + + # Combine and post-process transactions + all_transactions = token1_transactions + token2_transactions + for tx in all_transactions: + for contract_tx in contract_transactions: + if tx["transactionHash"] == contract_tx["transactionHash"]: + tx["transactionSubType"] = contract_tx["transactionSubType"] + break + + # Return the processed transactions + return await transaction_post_processing(all_transactions) + + except aiomysql.MySQLError as e: + logger.error(f"Database error while fetching swap contract transactions: {e}", exc_info=True) + return jsonify(description="Database error"), 500 + except Exception as e: + logger.error(f"Unexpected error: {e}", exc_info=True) + return jsonify(description="Unexpected error occurred"), 500 + + + + +def sort_transactions(transactionJsonData): + transactionJsonData = sorted(transactionJsonData, key=lambda x: x['time'], reverse=True) + return transactionJsonData + +def process_committee_flodata(flodata): + flo_address_list = [] + try: + contract_committee_actions = flodata['token-tracker']['contract-committee'] + except KeyError: + print('Flodata related to contract committee') + else: + # Adding first and removing later to maintain consistency and not to depend on floData for order of execution + for action in contract_committee_actions.keys(): + if action == 'add': + for floid in contract_committee_actions[f'{action}']: + flo_address_list.append(floid) + + for action in contract_committee_actions.keys(): + if action == 'remove': + for floid in contract_committee_actions[f'{action}']: + flo_address_list.remove(floid) + finally: + return flo_address_list + + +async def refresh_committee_list(admin_flo_id, api_url, blocktime): + committee_list = [] + latest_param = 'true' + mempool_param = 'false' + init_id = None + + async def process_transaction(transaction_info): + if 'isCoinBase' in transaction_info or transaction_info['vin'][0]['addresses'][0] != admin_flo_id or transaction_info['blocktime'] > blocktime: + return + try: + tx_flodata = json.loads(transaction_info['floData']) + committee_list.extend(process_committee_flodata(tx_flodata)) + except Exception as e: + print(f"Error processing transaction: {e}") + + async def send_api_request(url): + timeout = aiohttp.ClientTimeout(total=API_TIMEOUT) + try: + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(url, ssl=API_VERIFY) as response: + if response.status == 200: + return await response.json() + else: + print('Response from the Blockbook API failed') + raise RuntimeError(f"API request failed with status {response.status}") + except asyncio.TimeoutError: + print(f"Request timed out after {API_TIMEOUT} seconds") + return ["timeout"] + except Exception as e: + print(f"Error during API request: {e}") + return None + + url = f'{api_url}api/v1/address/{admin_flo_id}?details=txs' + response = await send_api_request(url) + if response == ["timeout"]: + return ["timeout"] + + if response is None: + return [] + + for transaction_info in response.get('txs', []): + await process_transaction(transaction_info) + + while 'incomplete' in response: + url = f'{api_url}api/v1/address/{admin_flo_id}/txs?latest={latest_param}&mempool={mempool_param}&before={init_id}' + response = await send_api_request(url) + if response == ["timeout"]: + return ["timeout"] + if response is None: + return [] + for transaction_info in response.get('items', []): + await process_transaction(transaction_info) + if 'incomplete' in response: + init_id = response['initItem'] + + return committee_list + + + + +@app.route('/') +async def welcome_msg(): + return jsonify('Welcome to RanchiMall FLO Api v2') + + +@app.route('/api/v1.0/getSystemData', methods=['GET']) +async def systemData(): + try: + # Standardized database names + system_db_name = standardize_db_name("system") + latest_cache_db_name = standardize_db_name("latestCache") + + # Query for the number of FLO addresses in tokenAddress mapping + async with await get_mysql_connection(system_db_name, USE_ASYNC=True) as conn_system: + async with conn_system.cursor() as cursor_system: + await cursor_system.execute('SELECT COUNT(DISTINCT tokenAddress) FROM tokenAddressMapping') + tokenAddressCount = (await cursor_system.fetchone())[0] + + await cursor_system.execute('SELECT COUNT(DISTINCT token) FROM tokenAddressMapping') + tokenCount = (await cursor_system.fetchone())[0] + + await cursor_system.execute('SELECT COUNT(DISTINCT contractName) FROM contractAddressMapping') + contractCount = (await cursor_system.fetchone())[0] + + await cursor_system.execute("SELECT value FROM systemData WHERE attribute='lastblockscanned'") + lastscannedblock = int((await cursor_system.fetchone())[0]) + + # Query for total number of validated blocks + async with await get_mysql_connection(latest_cache_db_name, USE_ASYNC=True) as conn_cache: + async with conn_cache.cursor() as cursor_cache: + await cursor_cache.execute('SELECT COUNT(DISTINCT blockNumber) FROM latestBlocks') + validatedBlockCount = (await cursor_cache.fetchone())[0] + + await cursor_cache.execute('SELECT COUNT(DISTINCT transactionHash) FROM latestTransactions') + validatedTransactionCount = (await cursor_cache.fetchone())[0] + + # Return the system data as JSON + return jsonify( + systemAddressCount=tokenAddressCount, + systemBlockCount=validatedBlockCount, + systemTransactionCount=validatedTransactionCount, + systemSmartContractCount=contractCount, + systemTokenCount=tokenCount, + lastscannedblock=lastscannedblock, + result='ok' + ) + + except Exception as e: + logger.error(f"Error in systemData function: {e}", exc_info=True) + return jsonify(result='error', description=INTERNAL_ERROR) + + + + + + +@app.route('/api/v1.0/broadcastTx/') +async def broadcastTx(raw_transaction_hash): + try: + p1 = subprocess.run(['flo-cli',f"-datadir={FLO_DATA_DIR}",'sendrawtransaction',raw_transaction_hash], capture_output=True) + return jsonify(args=p1.args,returncode=p1.returncode,stdout=p1.stdout.decode(),stderr=p1.stderr.decode()) + except Exception as e: + print("broadcastTx:", e) + return jsonify(result='error', description=INTERNAL_ERROR) + + +@app.route('/api/v1.0/getTokenList', methods=['GET']) +async def getTokenList(): + if not is_backend_ready(): + return jsonify(result='error', description=BACKEND_NOT_READY_ERROR) + + try: + # Prefix and suffix for databases + database_prefix = f"{mysql_config.database_prefix}_" + database_suffix = "_db" + + # Initialize token list + token_list = [] + + # Use `async with` to handle connection cleanup automatically + async with await get_mysql_connection("information_schema", no_standardize=True, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Query to get all databases matching the prefix and suffix + query = f"SELECT SCHEMA_NAME FROM SCHEMATA WHERE SCHEMA_NAME LIKE '{database_prefix}%' AND SCHEMA_NAME LIKE '%{database_suffix}'" + await cursor.execute(query) + all_databases = [row[0] for row in await cursor.fetchall()] + + # Separate token databases from smart contract databases + for db_name in all_databases: + # Remove the prefix and suffix for parsing + stripped_name = db_name[len(database_prefix):-len(database_suffix)] + + # Exclude "latestCache" and "system" databases + if stripped_name in ["latestCache", "system"]: + continue + + # Token databases will not contain an address-like string + parts = stripped_name.split('_') + if len(parts) == 1: # Token databases have a single part (e.g., usd, inr) + token_list.append(stripped_name) + elif len(parts) == 2 and len(parts[1]) == 34 and parts[1].startswith('F'): + # Smart contracts are excluded + continue + + # Return the token list in the original format + return jsonify(tokens=token_list, result='ok') + + except Exception as e: + logger.error(f"getTokenList: {e}", exc_info=True) + return jsonify(result='error', description=INTERNAL_ERROR), 500 + + +@app.route('/api/v1.0/getTokenInfo', methods=['GET']) +async def getTokenInfo(): + try: + token = request.args.get('token') + if token is None: + return jsonify(result='error', description='token name hasn\'t been passed'), 400 + + # Standardize token database name + db_name = standardize_db_name(token) + + # Connect to the token database asynchronously + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Fetch incorporation details + query = "SELECT * FROM transactionHistory WHERE id = 1" + await cursor.execute(query) + incorporationRow = await cursor.fetchone() + if not incorporationRow: + return jsonify(result='error', description='Incorporation details not found'), 404 + + # Fetch the number of distinct addresses + query = "SELECT COUNT(DISTINCT address) FROM activeTable" + await cursor.execute(query) + numberOf_distinctAddresses = (await cursor.fetchone())[0] + + # Fetch the total number of transactions + query = "SELECT MAX(id) FROM transactionHistory" + await cursor.execute(query) + numberOf_transactions = (await cursor.fetchone())[0] + + # Fetch associated contracts + query = """ + SELECT contractName, contractAddress, blockNumber, blockHash, transactionHash + FROM tokenContractAssociation + """ + await cursor.execute(query) + associatedContracts = await cursor.fetchall() + + # Prepare associated contract list + associatedContractList = [ + { + 'contractName': item[0], + 'contractAddress': item[1], + 'blockNumber': item[2], + 'blockHash': item[3], + 'transactionHash': item[4], + } + for item in associatedContracts + ] + + # Prepare the response + response = { + 'result': 'ok', + 'token': token, + 'incorporationAddress': incorporationRow[1], + 'tokenSupply': incorporationRow[3], + 'time': incorporationRow[6], + 'blockchainReference': incorporationRow[7], + 'activeAddress_no': numberOf_distinctAddresses, + 'totalTransactions': numberOf_transactions, + 'associatedSmartContracts': associatedContractList, + } + + if not is_backend_ready(): + response['warning'] = BACKEND_NOT_READY_WARNING + return jsonify(response), 206 + else: + return jsonify(response), 200 + + except Exception as e: + logger.error(f"getTokenInfo: {e}", exc_info=True) + return jsonify(result='error', description=INTERNAL_ERROR), 500 + + + +@app.route('/api/v1.0/getTokenTransactions', methods=['GET']) +async def getTokenTransactions(): + try: + token = request.args.get('token') + senderFloAddress = request.args.get('senderFloAddress') + destFloAddress = request.args.get('destFloAddress') + limit = request.args.get('limit') + + if token is None: + return jsonify(result='error', description='token name hasn\'t been passed'), 400 + + # Standardize token database name + db_name = standardize_db_name(token) + + # Connect to the token database asynchronously + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Build the base query + query = "SELECT jsonData, parsedFloData FROM transactionHistory" + conditions = [] + params = [] + + # Add filters based on sender and destination addresses + if senderFloAddress and not destFloAddress: + conditions.append("sourceFloAddress = %s") + params.append(senderFloAddress) + elif not senderFloAddress and destFloAddress: + conditions.append("destFloAddress = %s") + params.append(destFloAddress) + elif senderFloAddress and destFloAddress: + conditions.append("sourceFloAddress = %s AND destFloAddress = %s") + params.extend([senderFloAddress, destFloAddress]) + + # Add conditions to the query + if conditions: + query += " WHERE " + " AND ".join(conditions) + + # Add ordering and limit + query += " ORDER BY id DESC" + if limit is not None: + if not limit.isdigit(): + return jsonify(result='error', description='limit validation failed'), 400 + query += " LIMIT %s" + params.append(int(limit)) + + # Execute the query asynchronously + await cursor.execute(query, params) + transactionJsonData = await cursor.fetchall() + + # Process the results + rowarray_list = {} + for row in transactionJsonData: + transactions_object = {} + transactions_object['transactionDetails'] = json.loads(row[0]) + transactions_object['transactionDetails'] = await update_transaction_confirmations(transactions_object['transactionDetails']) + transactions_object['parsedFloData'] = json.loads(row[1]) + rowarray_list[transactions_object['transactionDetails']['txid']] = transactions_object + + # Return the response + if not is_backend_ready(): + return jsonify(result='ok', token=token, transactions=rowarray_list, warning=BACKEND_NOT_READY_WARNING), 206 + else: + return jsonify(result='ok', token=token, transactions=rowarray_list), 200 + + except Exception as e: + logger.error(f"getTokenTransactions: {e}", exc_info=True) + return jsonify(result='error', description=INTERNAL_ERROR), 500 + + +@app.route('/api/v1.0/getTokenBalances', methods=['GET']) +async def getTokenBalances(): + try: + token = request.args.get('token') + if token is None: + return jsonify(result='error', description='token name hasn\'t been passed'), 400 + + # Standardize token database name + db_name = standardize_db_name(token) + + # Connect to the token database asynchronously + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Fetch address balances grouped by address asynchronously + query = "SELECT address, SUM(transferBalance) FROM activeTable GROUP BY address" + await cursor.execute(query) + addressBalances = await cursor.fetchall() + + # Prepare the response + returnList = {address[0]: address[1] for address in addressBalances} + + if not is_backend_ready(): + return jsonify(result='ok', token=token, balances=returnList, warning=BACKEND_NOT_READY_WARNING), 206 + else: + return jsonify(result='ok', token=token, balances=returnList), 200 + + except Exception as e: + logger.error(f"getTokenBalances: {e}", exc_info=True) + return jsonify(result='error', description=INTERNAL_ERROR), 500 + + +# FLO Address APIs +@app.route('/api/v1.0/getFloAddressInfo', methods=['GET']) +async def getFloAddressInfo(): + try: + floAddress = request.args.get('floAddress') + if floAddress is None: + return jsonify(description='floAddress hasn\'t been passed'), 400 + + # Connect to the system database asynchronously + db_name = standardize_db_name("system") + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Fetch associated tokens asynchronously + query = "SELECT token FROM tokenAddressMapping WHERE tokenAddress = %s" + await cursor.execute(query, (floAddress,)) + tokenNames = await cursor.fetchall() + + # Fetch incorporated contracts asynchronously + query = """ + SELECT contractName, status, tokenIdentification, contractType, transactionHash, blockNumber, blockHash + FROM activecontracts + WHERE contractAddress = %s + """ + await cursor.execute(query, (floAddress,)) + incorporatedContracts = await cursor.fetchall() + + # Prepare token details by querying each token database separately asynchronously + detailList = {} + if tokenNames: + tasks = [] + for token in tokenNames: + token_name = token[0] + token_db_name = standardize_db_name(token_name) + + tasks.append(fetch_token_balance(token_name, token_db_name, floAddress)) + + # Run all tasks concurrently + token_balances = await asyncio.gather(*tasks) + + # Add token balances to the details list + for balance in token_balances: + detailList.update(balance) + + else: + # Address is not associated with any token + if not is_backend_ready(): + return jsonify(result='error', description=BACKEND_NOT_READY_ERROR), 503 + else: + return jsonify(result='error', description='FLO address is not associated with any tokens'), 404 + + # Prepare contract details asynchronously + incorporatedSmartContracts = [] + if incorporatedContracts: + for contract in incorporatedContracts: + tempdict = { + 'contractName': contract[0], + 'contractAddress': floAddress, + 'status': contract[1], + 'tokenIdentification': contract[2], + 'contractType': contract[3], + 'transactionHash': contract[4], + 'blockNumber': contract[5], + 'blockHash': contract[6], + } + incorporatedSmartContracts.append(tempdict) + else: + incorporatedSmartContracts = None + + # Return the response + response = { + 'result': 'ok', + 'floAddress': floAddress, + 'floAddressBalances': detailList, + 'incorporatedSmartContracts': incorporatedSmartContracts, + } + + if not is_backend_ready(): + response['warning'] = BACKEND_NOT_READY_WARNING + return jsonify(response), 206 + else: + return jsonify(response), 200 + + except Exception as e: + logger.error(f"getFloAddressInfo: {e}", exc_info=True) + return jsonify(result='error', description=INTERNAL_ERROR), 500 + + + +async def fetch_token_balance(token_name, floAddress): + """ + Fetches the balance for a specific token in the provided token database. + + Args: + token_name (str): The name of the token. + floAddress (str): The FLO address to fetch the balance for. + + Returns: + dict: A dictionary with the token balance. + """ + try: + token_db_name = standardize_db_name(token_name) + + # Use `async with` for resource management + async with await get_mysql_connection(token_db_name, USE_ASYNC=True) as conn_token: + async with conn_token.cursor() as cursor_token: + + # Fetch balance for the token asynchronously + query = "SELECT SUM(transferBalance) FROM activeTable WHERE address = %s" + await cursor_token.execute(query, (floAddress,)) + balance = (await cursor_token.fetchone())[0] or 0 + + return {token_name: {'balance': balance, 'token': token_name}} + + except Exception as e: + print(f"Error fetching balance for token {token_name}: {e}") + return {token_name: {'balance': 0, 'token': token_name}} + + + + +@app.route('/api/v1.0/getFloAddressBalance', methods=['GET']) +async def getAddressBalance(): + try: + floAddress = request.args.get('floAddress') + token = request.args.get('token') + + if floAddress is None: + return jsonify(result='error', description='floAddress hasn\'t been passed'), 400 + + if token is None: + # If no specific token is provided, get balances for all associated tokens + db_name = standardize_db_name("system") + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Fetch associated tokens asynchronously + query = "SELECT token FROM tokenAddressMapping WHERE tokenAddress = %s" + await cursor.execute(query, (floAddress,)) + tokenNames = [row[0] for row in await cursor.fetchall()] + + if tokenNames: + # Use asyncio to fetch balances in parallel + tasks = [fetch_token_balance(token_name, floAddress) for token_name in tokenNames] + results = await asyncio.gather(*tasks) + + # Combine results into detailList + detailList = {k: v for result in results for k, v in result.items()} + + if not is_backend_ready(): + return jsonify(result='ok', warning=BACKEND_NOT_READY_WARNING, floAddress=floAddress, floAddressBalances=detailList), 206 + else: + return jsonify(result='ok', floAddress=floAddress, floAddressBalances=detailList), 200 + + else: + # Address is not associated with any token + if not is_backend_ready(): + return jsonify(result='error', description=BACKEND_NOT_READY_ERROR), 503 + else: + return jsonify(result='error', description='FLO address is not associated with any tokens'), 404 + + else: + # If a specific token is provided, get the balance for that token + token_db_name = standardize_db_name(token) + async with await get_mysql_connection(token_db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Fetch balance for the address asynchronously + query = "SELECT SUM(transferBalance) FROM activeTable WHERE address = %s" + await cursor.execute(query, (floAddress,)) + balance = (await cursor.fetchone())[0] or 0 + + if not is_backend_ready(): + return jsonify(result='ok', warning=BACKEND_NOT_READY_WARNING, token=token, floAddress=floAddress, balance=balance), 206 + else: + return jsonify(result='ok', token=token, floAddress=floAddress, balance=balance), 200 + + except Exception as e: + print("getAddressBalance:", e) + return jsonify(result='error', description=INTERNAL_ERROR), 500 + + +@app.route('/api/v1.0/getFloAddressTransactions', methods=['GET']) +async def getFloAddressTransactions(): + try: + floAddress = request.args.get('floAddress') + token = request.args.get('token') + limit = request.args.get('limit') + + if floAddress is None: + return jsonify(result='error', description='floAddress has not been passed'), 400 + + # Handle token-specific or all-token scenarios + if token is None: + # Fetch all tokens associated with the floAddress + db_name = standardize_db_name("system") + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + query = "SELECT token FROM tokenAddressMapping WHERE tokenAddress = %s" + await cursor.execute(query, (floAddress,)) + tokenNames = await cursor.fetchall() + + else: + # Check if the token database exists + token_db_name = standardize_db_name(token) + try: + async with await get_mysql_connection(token_db_name, USE_ASYNC=True): + tokenNames = [(token,)] + except Exception: + if not is_backend_ready(): + return jsonify(result='error', description=BACKEND_NOT_READY_ERROR), 503 + else: + return jsonify(result='error', description='Token does not exist'), 404 + + # Process transactions for the tokens + if tokenNames: + allTransactionList = {} + tasks = [] # List to hold async tasks for fetching transactions + + for token_row in tokenNames: + tokenname = token_row[0] + token_db_name = standardize_db_name(tokenname) + + tasks.append(fetch_transactions_for_token(token_db_name, floAddress, limit)) + + # Run all tasks concurrently + transaction_data = await asyncio.gather(*tasks) + + # Process fetched transactions + for data in transaction_data: + for row in data: + transactions_object = {} + transactions_object['transactionDetails'] = json.loads(row[0]) + transactions_object['transactionDetails'] = await update_transaction_confirmations( + transactions_object['transactionDetails'] + ) + transactions_object['parsedFloData'] = json.loads(row[1]) + allTransactionList[transactions_object['transactionDetails']['txid']] = transactions_object + + # Construct response based on token presence + response = { + 'result': 'ok', + 'floAddress': floAddress, + 'transactions': allTransactionList + } + if token is not None: + response['token'] = token + if not is_backend_ready(): + response['warning'] = BACKEND_NOT_READY_WARNING + return jsonify(response), 206 + else: + return jsonify(response), 200 + + else: + # No token transactions associated with this address + if not is_backend_ready(): + return jsonify(result='error', description=BACKEND_NOT_READY_ERROR), 503 + else: + return jsonify(result='error', description='No token transactions present on this address'), 404 + + except Exception as e: + print("getFloAddressTransactions:", e) + return jsonify(result='error', description=INTERNAL_ERROR), 500 + + +async def fetch_transactions_for_token(token_db_name, floAddress, limit): + """ + Fetches transactions for a specific token database. + + Args: + token_db_name (str): The token database name. + floAddress (str): The FLO address to filter transactions. + limit (int): The limit for the number of transactions. + + Returns: + list: List of transactions for the token. + """ + try: + async with await get_mysql_connection(token_db_name, USE_ASYNC=True) as conn_token: + async with conn_token.cursor() as cursor_token: + # Build query to fetch transactions + query = """ + SELECT jsonData, parsedFloData + FROM transactionHistory + WHERE sourceFloAddress = %s OR destFloAddress = %s + ORDER BY id DESC + """ + params = (floAddress, floAddress) + + if limit: + query += " LIMIT %s" + params = (floAddress, floAddress, int(limit)) + + await cursor_token.execute(query, params) + return await cursor_token.fetchall() + + except Exception as e: + print(f"Error fetching transactions for token {token_db_name}: {e}") + return [] + + + + + +# SMART CONTRACT APIs +@app.route('/api/v1.0/getSmartContractList', methods=['GET']) +async def getContractList(): + try: + # Get query parameters + contractName = request.args.get('contractName') + contractAddress = request.args.get('contractAddress') + + # Standardize system database name + system_db_name = standardize_db_name("system") + + # Initialize contract list + contractList = [] + + # Connect to the system database asynchronously and manage it with async context + async with await get_mysql_connection(system_db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + + # Build the query dynamically based on input parameters + query = "SELECT * FROM activecontracts" + conditions = [] + params = [] + + if contractName: + conditions.append("contractName=%s") + params.append(contractName) + + if contractAddress: + conditions.append("contractAddress=%s") + params.append(contractAddress) + + if conditions: + query += " WHERE " + " AND ".join(conditions) + + # Execute the query asynchronously + await cursor.execute(query, tuple(params)) + allcontractsDetailList = await cursor.fetchall() + + # Process the results asynchronously + for contract in allcontractsDetailList: + contractDict = { + 'contractName': contract[1], + 'contractAddress': contract[2], + 'status': contract[3], + 'tokenIdentification': contract[4], + 'contractType': contract[5], + 'transactionHash': contract[6], + 'blockNumber': contract[7], + 'incorporationDate': contract[8], + } + if contract[9]: + contractDict['expiryDate'] = contract[9] + if contract[10]: + contractDict['closeDate'] = contract[10] + + contractList.append(contractDict) + + # Check backend readiness and return response + if not is_backend_ready(): + return jsonify(smartContracts=contractList, result='ok', warning=BACKEND_NOT_READY_WARNING), 206 + else: + return jsonify(smartContracts=contractList, result='ok'), 200 + + except Exception as e: + print("getContractList:", e) + return jsonify(result='error', description=INTERNAL_ERROR), 500 + + + +@app.route('/api/v1.0/getSmartContractInfo', methods=['GET'], endpoint='getSmartContractInfoV1') +async def getContractInfo(): + logger.info("Entering getContractInfo function") + try: + # Get query parameters + contractName = request.args.get('contractName') + contractAddress = request.args.get('contractAddress') + + if not contractName: + logger.error("Contract name is missing") + return jsonify(result='error', description="Smart Contract's name hasn't been passed"), 400 + if not contractAddress: + logger.error("Contract address is missing") + return jsonify(result='error', description="Smart Contract's address hasn't been passed"), 400 + + # Standardize the smart contract database name + contract_db_name = standardize_db_name(f"{contractName}_{contractAddress}") + logger.info(f"Standardized contract database name: {contract_db_name}") + + # Initialize response dictionary + returnval = {} + + # Fetch contract structure + try: + async with await get_mysql_connection(contract_db_name, USE_ASYNC=True) as contract_conn: + async with contract_conn.cursor() as cursor: + await cursor.execute("SELECT attribute, value FROM contractstructure") + result = await cursor.fetchall() + if not result: + logger.error("No contract structure found") + return jsonify(result='error', description="No contract structure found for the specified smart contract"), 404 + + # Process contract structure data + contractStructure = {} + conditionDict = {} + for item in result: + if item[0] == 'exitconditions': + conditionDict[len(conditionDict)] = item[1] + else: + contractStructure[item[0]] = item[1] + + if conditionDict: + contractStructure['exitconditions'] = conditionDict + + # Update the response dictionary + returnval.update(contractStructure) + if 'exitconditions' in returnval: + returnval['userChoice'] = returnval.pop('exitconditions') + + except Exception as e: + logger.error(f"Error fetching contract structure: {e}", exc_info=True) + return jsonify(result='error', description="Failed to fetch contract structure"), 500 + + # Return the final response + logger.info("Returning final response with contract information") + return jsonify(result='ok', contractName=contractName, contractAddress=contractAddress, contractInfo=returnval), 200 + + except Exception as e: + logger.error(f"Unhandled error in getContractInfo: {e}", exc_info=True) + return jsonify(result='error', description="Internal Server Error"), 500 + + + +@app.route('/api/v1.0/getSmartContractParticipants', methods=['GET']) +async def getcontractparticipants(): + try: + contractName = request.args.get('contractName') + contractAddress = request.args.get('contractAddress') + + if not contractName: + return jsonify(result='error', description="Smart Contract's name hasn't been passed"), 400 + + if not contractAddress: + return jsonify(result='error', description="Smart Contract's address hasn't been passed"), 400 + + contractName = contractName.strip().lower() + contractAddress = contractAddress.strip() + + # Standardize smart contract database name + contract_db_name = standardize_db_name(f"{contractName}_{contractAddress}") + + # Fetch contract structure asynchronously + contractStructure = await fetchContractStructure(contractName, contractAddress) + + # Initialize response + returnval = {} + + # Get an async connection to the contract database + async with await get_mysql_connection(contract_db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + + # Handle external trigger contracts + if 'exitconditions' in contractStructure: + await cursor.execute('SELECT * FROM contractTransactionHistory WHERE transactionType="trigger"') + triggers = await cursor.fetchall() + + if len(triggers) == 1: + await cursor.execute('SELECT value FROM contractstructure WHERE attribute="tokenIdentification"') + token = (await cursor.fetchone())[0] + + await cursor.execute('SELECT id, participantAddress, tokenAmount, userChoice, transactionHash, winningAmount FROM contractparticipants') + result = await cursor.fetchall() + for row in result: + returnval[row[1]] = { + 'participantFloAddress': row[1], + 'tokenAmount': row[2], + 'userChoice': row[3], + 'transactionHash': row[4], + 'winningAmount': row[5], + 'tokenIdentification': token + } + elif len(triggers) == 0: + await cursor.execute('SELECT id, participantAddress, tokenAmount, userChoice, transactionHash FROM contractparticipants') + result = await cursor.fetchall() + for row in result: + returnval[row[1]] = { + 'participantFloAddress': row[1], + 'tokenAmount': row[2], + 'userChoice': row[3], + 'transactionHash': row[4] + } + else: + return jsonify(result='error', description='More than 1 trigger present. This is unusual, please check your code'), 500 + + # Handle internal trigger contracts + elif 'payeeAddress' in contractStructure: + await cursor.execute('SELECT id, participantAddress, tokenAmount, userChoice, transactionHash FROM contractparticipants') + result = await cursor.fetchall() + for row in result: + returnval[row[1]] = { + 'participantFloAddress': row[1], + 'tokenAmount': row[2], + 'userChoice': row[3], + 'transactionHash': row[4] + } + + # Handle continuous-event contracts with token swaps + elif contractStructure['contractType'] == 'continuos-event' and contractStructure['subtype'] == 'tokenswap': + await cursor.execute('SELECT * FROM contractparticipants') + contract_participants = await cursor.fetchall() + for row in contract_participants: + returnval[row[1]] = { + 'participantFloAddress': row[1], + 'participationAmount': row[2], + 'swapPrice': float(row[3]), + 'transactionHash': row[4], + 'blockNumber': row[5], + 'blockHash': row[6], + 'swapAmount': row[7] + } + + # Final response + if not is_backend_ready(): + return jsonify(result='ok', warning=BACKEND_NOT_READY_WARNING, contractName=contractName, contractAddress=contractAddress, participantInfo=returnval), 206 + else: + return jsonify(result='ok', contractName=contractName, contractAddress=contractAddress, participantInfo=returnval), 200 + + except Exception as e: + print("getcontractparticipants:", e) + return jsonify(result='error', description=INTERNAL_ERROR), 500 + + + + + +@app.route('/api/v1.0/getParticipantDetails', methods=['GET'], endpoint='getParticipantDetailsV1') +async def getParticipantDetails(): + try: + floAddress = request.args.get('floAddress') + contractName = request.args.get('contractName') + contractAddress = request.args.get('contractAddress') + + if not floAddress: + return jsonify(result='error', description="FLO address hasn't been passed"), 400 + + if (contractName and not contractAddress) or (not contractName and contractAddress): + return jsonify(result='error', description='Pass both contractName and contractAddress as URL parameters'), 400 + + floAddress = floAddress.strip() + if contractName: + contractName = contractName.strip().lower() + if contractAddress: + contractAddress = contractAddress.strip() + + # Standardize the contract database name + system_db_name = standardize_db_name("system") + contract_db_name = standardize_db_name(f"{contractName}_{contractAddress}") if contractName and contractAddress else None + + # Connect to the system database asynchronously + async with await get_mysql_connection(system_db_name, USE_ASYNC=True) as conn_system: + async with conn_system.cursor() as cursor_system: + # Query contract participation details for the FLO address + query = """ + SELECT * + FROM contractAddressMapping + WHERE address = %s AND addressType = 'participant' + """ + params = [floAddress] + + if contractName and contractAddress: + query += " AND contractName = %s AND contractAddress = %s" + params.extend([contractName, contractAddress]) + + await cursor_system.execute(query, tuple(params)) + participant_address_contracts = await cursor_system.fetchall() + + if not participant_address_contracts: + if not is_backend_ready(): + return jsonify(result='error', description=BACKEND_NOT_READY_ERROR), 503 + else: + return jsonify(result='error', description="Address hasn't participated in any contract"), 404 + + participationDetailsList = [] + for contract in participant_address_contracts: + detailsDict = {} + contract_name = contract[3] + contract_address = contract[4] + db_name = standardize_db_name(f"{contract_name}_{contract_address}") + + # Fetch participation details from the contract database + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn_contract: + async with conn_contract.cursor() as cursor_contract: + # Fetch contract structure asynchronously + contractStructure = await fetchContractStructure(contract_name, contract_address) + + if contractStructure['contractType'] == 'continuos-event' and contractStructure['subtype'] == 'tokenswap': + await cursor_contract.execute("SELECT * FROM contractparticipants WHERE participantAddress = %s", (floAddress,)) + participant_details = await cursor_contract.fetchall() + participationList = [] + for participation in participant_details: + detailsDict = { + 'participationAddress': floAddress, + 'participationAmount': participation[2], + 'receivedAmount': float(participation[3]), + 'participationToken': contractStructure['accepting_token'], + 'receivedToken': contractStructure['selling_token'], + 'swapPrice_received_to_participation': float(participation[7]), + 'transactionHash': participation[4], + 'blockNumber': participation[5], + 'blockHash': participation[6], + } + participationList.append(detailsDict) + participationDetailsList.append(participationList) + + elif contractStructure['contractType'] == 'one-time-event' and 'payeeAddress' in contractStructure: + detailsDict['contractName'] = contract_name + detailsDict['contractAddress'] = contract_address + + await cursor_system.execute(''' + SELECT status, tokenIdentification, contractType, blockNumber, blockHash, incorporationDate, expiryDate, closeDate + FROM activecontracts + WHERE contractName = %s AND contractAddress = %s + ''', (contract_name, contract_address)) + temp = await cursor_system.fetchone() + detailsDict.update({ + 'status': temp[0], + 'tokenIdentification': temp[1], + 'contractType': temp[2], + 'blockNumber': temp[3], + 'blockHash': temp[4], + 'incorporationDate': temp[5], + 'expiryDate': temp[6], + 'closeDate': temp[7] + }) + + await cursor_contract.execute("SELECT tokenAmount FROM contractparticipants WHERE participantAddress = %s", (floAddress,)) + result = await cursor_contract.fetchone() + detailsDict['tokenAmount'] = result[0] + participationDetailsList.append(detailsDict) + + elif contractStructure['contractType'] == 'one-time-event' and 'exitconditions' in contractStructure: + detailsDict['contractName'] = contract_name + detailsDict['contractAddress'] = contract_address + + await cursor_system.execute(''' + SELECT status, tokenIdentification, contractType, blockNumber, blockHash, incorporationDate, expiryDate, closeDate + FROM activecontracts + WHERE contractName = %s AND contractAddress = %s + ''', (contract_name, contract_address)) + temp = await cursor_system.fetchone() + detailsDict.update({ + 'status': temp[0], + 'tokenIdentification': temp[1], + 'contractType': temp[2], + 'blockNumber': temp[3], + 'blockHash': temp[4], + 'incorporationDate': temp[5], + 'expiryDate': temp[6], + 'closeDate': temp[7] + }) + + await cursor_contract.execute(""" + SELECT userChoice, winningAmount + FROM contractparticipants + WHERE participantAddress = %s + """, (floAddress,)) + result = await cursor_contract.fetchone() + detailsDict['userChoice'] = result[0] + detailsDict['winningAmount'] = result[1] + participationDetailsList.append(detailsDict) + + # Final response based on backend readiness + if not is_backend_ready(): + return jsonify( + warning=BACKEND_NOT_READY_WARNING, + floAddress=floAddress, + type='participant', + participatedContracts=participationDetailsList + ), 206 + else: + return jsonify( + floAddress=floAddress, + type='participant', + participatedContracts=participationDetailsList + ), 200 + + except Exception as e: + print("getParticipantDetails:", e) + return jsonify(description="Unexpected error occurred"), 500 + + + + + +@app.route('/api/v1.0/getSmartContractTransactions', methods=['GET']) +async def getsmartcontracttransactions(): + try: + contractName = request.args.get('contractName') + contractAddress = request.args.get('contractAddress') + + if not contractName: + return jsonify(result='error', description="Smart Contract's name hasn't been passed"), 400 + + if not contractAddress: + return jsonify(result='error', description="Smart Contract's address hasn't been passed"), 400 + + # Standardize the contract database name + contract_db_name = standardize_db_name(f"{contractName.strip()}_{contractAddress.strip()}") + + # Establish asynchronous connection to the contract database + async with await get_mysql_connection(contract_db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Fetch contract transaction data asynchronously + query = ''' + SELECT jsonData, parsedFloData + FROM contractTransactionHistory + ''' + await cursor.execute(query) + result = await cursor.fetchall() + + # Process the fetched transaction data + returnval = {} + for item in result: + transactions_object = {} + transactions_object['transactionDetails'] = json.loads(item[0]) + transactions_object['transactionDetails'] = await update_transaction_confirmations(transactions_object['transactionDetails']) + transactions_object['parsedFloData'] = json.loads(item[1]) + returnval[transactions_object['transactionDetails']['txid']] = transactions_object + + # Check backend readiness + if not is_backend_ready(): + return jsonify( + result='ok', + warning=BACKEND_NOT_READY_WARNING, + contractName=contractName, + contractAddress=contractAddress, + contractTransactions=returnval + ), 206 + else: + return jsonify( + result='ok', + contractName=contractName, + contractAddress=contractAddress, + contractTransactions=returnval + ), 200 + + except aiomysql.MySQLError as e: + print(f"MySQL error while fetching transactions: {e}") + return jsonify(result='error', description='Database error occurred'), 500 + + except Exception as e: + print("getSmartContractTransactions:", e) + return jsonify(result='error', description=INTERNAL_ERROR), 500 + + + + + +@app.route('/api/v1.0/getBlockDetails/', methods=['GET']) +async def getblockdetails(blockdetail): + try: + blockJson = await blockdetailhelper(blockdetail) + if len(blockJson) != 0: + blockJson = json.loads(blockJson[0][0]) + return jsonify(result='ok', blockDetails=blockJson) + else: + if not is_backend_ready(): + return jsonify(result='error', description=BACKEND_NOT_READY_ERROR) + else: + return jsonify(result='error', description='Block doesn\'t exist in database') + except Exception as e: + print("getblockdetails:", e) + return jsonify(result='error', description=INTERNAL_ERROR) + + +@app.route('/api/v1.0/getTransactionDetails/', methods=['GET']) +async def gettransactiondetails(transactionHash): + try: + transactionJsonData = await transactiondetailhelper(transactionHash) + if len(transactionJsonData) != 0: + transactionJson = json.loads(transactionJsonData[0][0]) + transactionJson = await update_transaction_confirmations(transactionJson) + parseResult = json.loads(transactionJsonData[0][1]) + + return jsonify(parsedFloData=parseResult, transactionDetails=transactionJson, transactionHash=transactionHash, result='ok') + else: + if not is_backend_ready(): + return jsonify(result='error', description=BACKEND_NOT_READY_ERROR) + else: + return jsonify(result='error', description='Transaction doesn\'t exist in database') + except Exception as e: + print("gettransactiondetails:", e) + return jsonify(result='error', description=INTERNAL_ERROR) + + +@app.route('/api/v1.0/getLatestTransactionDetails', methods=['GET']) +async def getLatestTransactionDetails(): + try: + numberOfLatestBlocks = request.args.get('numberOfLatestBlocks') + + # Standardize the database name and connect to MySQL asynchronously + db_name = standardize_db_name('latestCache') + + # Use `async with` for resource management + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + tempdict = {} + + if numberOfLatestBlocks is not None: + # Fetch transactions from the latest blocks based on the specified limit + query = ''' + SELECT * + FROM latestTransactions + WHERE blockNumber IN ( + SELECT DISTINCT blockNumber + FROM latestTransactions + ORDER BY blockNumber DESC + LIMIT %s + ) + ORDER BY id ASC; + ''' + await cursor.execute(query, (int(numberOfLatestBlocks),)) + else: + # Fetch transactions from all distinct latest blocks + query = ''' + SELECT * + FROM latestTransactions + WHERE blockNumber IN ( + SELECT DISTINCT blockNumber + FROM latestTransactions + ORDER BY blockNumber DESC + ) + ORDER BY id ASC; + ''' + await cursor.execute(query) + + # Fetch the transactions asynchronously + latestTransactions = await cursor.fetchall() + + # Process each transaction + tempdict = {} + for item in latestTransactions: + item = list(item) + tx_parsed_details = {} + + # Parse transaction details + tx_parsed_details['transactionDetails'] = json.loads(item[3]) + tx_parsed_details['transactionDetails'] = await update_transaction_confirmations(tx_parsed_details['transactionDetails']) + tx_parsed_details['parsedFloData'] = json.loads(item[5]) + tx_parsed_details['parsedFloData']['transactionType'] = item[4] + tx_parsed_details['transactionDetails']['blockheight'] = int(item[2]) + + # Merge parsed details and transaction details + tx_parsed_details = {**tx_parsed_details['transactionDetails'], **tx_parsed_details['parsedFloData']} + + # Add on-chain flag + tx_parsed_details['onChain'] = True + + # Add transaction to the dictionary using txid as the key + tempdict[json.loads(item[3])['txid']] = tx_parsed_details + + # Respond based on backend readiness + if not is_backend_ready(): + return jsonify( + result='ok', + warning=BACKEND_NOT_READY_WARNING, + latestTransactions=tempdict + ), 206 + else: + return jsonify(result='ok', latestTransactions=tempdict), 200 + + except Exception as e: + print("getLatestTransactionDetails:", e) + return jsonify(result='error', description=INTERNAL_ERROR), 500 + + + + + +@app.route('/api/v1.0/getLatestBlockDetails', methods=['GET']) +async def getLatestBlockDetails(): + try: + limit = request.args.get('limit') + + # Standardize the database name + db_name = standardize_db_name('latestCache') + + # Use `async with` for resource management + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Default query for the latest 4 blocks if no limit is provided + query = ''' + SELECT * + FROM ( + SELECT * + FROM latestBlocks + ORDER BY blockNumber DESC + LIMIT %s + ) AS subquery + ORDER BY id ASC; + ''' + + # Set the limit for the query + limit = int(limit) if limit is not None else 4 + + # Execute the query + await cursor.execute(query, (limit,)) + latestBlocks = await cursor.fetchall() + + # Parse the block details + tempdict = {} + for item in latestBlocks: + tempdict[json.loads(item[3])['hash']] = json.loads(item[3]) + + # Respond based on backend readiness + if not is_backend_ready(): + return jsonify(result='ok', warning=BACKEND_NOT_READY_WARNING, latestBlocks=tempdict), 206 + else: + return jsonify(result='ok', latestBlocks=tempdict), 200 + + except Exception as e: + print("getLatestBlockDetails:", e) + return jsonify(result='error', description=INTERNAL_ERROR), 500 + + + + + +@app.route('/api/v1.0/getBlockTransactions/', methods=['GET']) +async def getblocktransactions(blockdetail): + try: + blockJson = await blockdetailhelper(blockdetail) + if len(blockJson) != 0: + blockJson = json.loads(blockJson[0][0]) + blocktxlist = blockJson['tx'] + blocktxs = {} + for i in range(len(blocktxlist)): + temptx = await transactiondetailhelper(blocktxlist[i]) + transactionJson = json.loads(temptx[0][0]) + transactionJson = await update_transaction_confirmations(transactionJson) + parseResult = json.loads(temptx[0][1]) + blocktxs[blocktxlist[i]] = { + "parsedFloData" : parseResult, + "transactionDetails" : transactionJson + } + return jsonify(result='ok', transactions=blocktxs, blockKeyword=blockdetail) + else: + if not is_backend_ready(): + return jsonify(result='error', description=BACKEND_NOT_READY_ERROR) + else: + return jsonify(result='error', description='Block doesn\'t exist in database') + + + except Exception as e: + print("getblocktransactions:", e) + return jsonify(result='error', description=INTERNAL_ERROR) + + +@app.route('/api/v1.0/categoriseString/', methods=['GET']) +async def categoriseString(urlstring): + try: + # Check if the hash is of a transaction asynchronously + async with aiohttp.ClientSession() as session: + async with session.get(f"{apiUrl}api/v1/tx/{urlstring}") as response: + if response.status == 200: + return jsonify(type="transaction"), 200 + + # Check if the hash is of a block asynchronously + async with session.get(f"{apiUrl}api/v1/block/{urlstring}") as response: + if response.status == 200: + return jsonify(type="block"), 200 + + # Initialize variables for database handling + database_prefix = f"{mysql_config.database_prefix}_" + database_suffix = "_db" + token_names = set() + contract_list = [] + + # Connect to MySQL information_schema using async `get_mysql_connection` + async with await get_mysql_connection("information_schema", no_standardize=True, USE_ASYNC=True) as conn_info: + async with conn_info.cursor() as cursor_info: + # Query to fetch all databases matching the prefix and suffix + await cursor_info.execute( + f"SELECT SCHEMA_NAME FROM SCHEMATA WHERE SCHEMA_NAME LIKE '{database_prefix}%' AND SCHEMA_NAME LIKE '%{database_suffix}'" + ) + databases = await cursor_info.fetchall() + + # Separate token databases and smart contract databases + for db in databases: + stripped_name = db[0][len(database_prefix):-len(database_suffix)] + + # Exclude "latestCache" and "system" databases + if stripped_name in ["latestCache", "system"]: + continue + + parts = stripped_name.split('_') + if len(parts) == 1: # Token database + token_names.add(stripped_name.lower()) + elif len(parts) == 2 and len(parts[1]) == 34 and parts[1].startswith('F'): # Smart contract database + contract_list.append({ + "contractName": parts[0], + "contractAddress": parts[1] + }) + + # Check if the string matches a token name + if urlstring.lower() in token_names: + return jsonify(type="token"), 200 + + # Connect to the system database asynchronously to check for smart contracts + async with await get_mysql_connection("system", USE_ASYNC=True) as conn_system: + async with conn_system.cursor() as cursor_system: + await cursor_system.execute("SELECT DISTINCT contractName FROM activeContracts") + smart_contract_names = {row[0].lower() for row in await cursor_system.fetchall()} + + if urlstring.lower() in smart_contract_names: + return jsonify(type="smartContract"), 200 + + # If no match, classify as noise + return jsonify(type="noise"), 200 + + except Exception as e: + print("categoriseString:", e) + return jsonify(result="error", description=INTERNAL_ERROR), 500 + + + +@app.route('/api/v1.0/getTokenSmartContractList', methods=['GET']) +async def getTokenSmartContractList(): + try: + # Prefix and suffix for databases + database_prefix = f"{mysql_config.database_prefix}_" + database_suffix = "_db" + + # Initialize lists + token_list = [] + contract_list = [] + + # Connect to MySQL information schema using `get_mysql_connection` with `no_standardize` + async with await get_mysql_connection("information_schema", no_standardize=True, USE_ASYNC=True) as conn_info: + async with conn_info.cursor() as cursor_info: + # Query all databases matching the prefix and suffix + await cursor_info.execute( + f"SELECT SCHEMA_NAME FROM SCHEMATA WHERE SCHEMA_NAME LIKE '{database_prefix}%' AND SCHEMA_NAME LIKE '%{database_suffix}'" + ) + all_databases = [row[0] for row in await cursor_info.fetchall()] + + # Separate token and smart contract databases + for db_name in all_databases: + stripped_name = db_name[len(database_prefix):-len(database_suffix)] + + # Exclude "latestCache" and "system" databases + if stripped_name in ["latestCache", "system"]: + continue + + parts = stripped_name.split('_') + + if len(parts) == 1: # Token database + token_list.append(stripped_name) + elif len(parts) == 2 and len(parts[1]) == 34 and parts[1].startswith('F'): # Smart contract database + contract_list.append({ + "contractName": parts[0], + "contractAddress": parts[1] + }) + + # Populate smart contract details + async with await get_mysql_connection("system", USE_ASYNC=True) as conn_system: + async with conn_system.cursor() as cursor_system: + await cursor_system.execute("SELECT * FROM activeContracts") + all_contracts_detail_list = await cursor_system.fetchall() + + for contract in all_contracts_detail_list: + for item in contract_list: + if item["contractName"] == contract[1] and item["contractAddress"] == contract[2]: + item.update({ + "status": contract[3], + "tokenIdentification": contract[4], + "contractType": contract[5], + "transactionHash": contract[6], + "blockNumber": contract[7], + "blockHash": contract[8], + "incorporationDate": contract[9], + "expiryDate": contract[10] if contract[10] else None, + "closeDate": contract[11] if contract[11] else None, + }) + + # Return response + if not is_backend_ready(): + return jsonify( + tokens=token_list, + warning=BACKEND_NOT_READY_WARNING, + smartContracts=contract_list, + result="ok" + ), 206 + else: + return jsonify( + tokens=token_list, + smartContracts=contract_list, + result="ok" + ), 200 + + except aiomysql.MySQLError as e: + print("Database error in getTokenSmartContractList:", e) + return jsonify(result="error", description="Database error occurred"), 500 + except Exception as e: + print("getTokenSmartContractList:", e) + return jsonify(result="error", description=INTERNAL_ERROR), 500 + + + + +################### +### VERSION 2 ### +################### + +@app.route('/api/v2/info', methods=['GET']) +async def info(): + try: + # Standardize database names + system_db_name = standardize_db_name("system") + latest_cache_db_name = standardize_db_name("latestCache") + + # Initialize data variables + tokenAddressCount = tokenCount = contractCount = lastscannedblock = 0 + validatedBlockCount = validatedTransactionCount = 0 + + # Use async with for resource management + async with await get_mysql_connection(system_db_name, USE_ASYNC=True) as conn_system: + async with conn_system.cursor() as cursor_system: + # Query for the number of FLO addresses in tokenAddress mapping + await cursor_system.execute("SELECT COUNT(DISTINCT tokenAddress) FROM tokenAddressMapping") + tokenAddressCount = (await cursor_system.fetchone())[0] + + await cursor_system.execute("SELECT COUNT(DISTINCT token) FROM tokenAddressMapping") + tokenCount = (await cursor_system.fetchone())[0] + + await cursor_system.execute("SELECT COUNT(DISTINCT contractName) FROM contractAddressMapping") + contractCount = (await cursor_system.fetchone())[0] + + await cursor_system.execute("SELECT value FROM systemData WHERE attribute='lastblockscanned'") + lastscannedblock = int((await cursor_system.fetchone())[0]) + + async with await get_mysql_connection(latest_cache_db_name, USE_ASYNC=True) as conn_latest_cache: + async with conn_latest_cache.cursor() as cursor_latest_cache: + # Query for total number of validated blocks + await cursor_latest_cache.execute("SELECT COUNT(DISTINCT blockNumber) FROM latestBlocks") + validatedBlockCount = (await cursor_latest_cache.fetchone())[0] + + await cursor_latest_cache.execute("SELECT COUNT(DISTINCT transactionHash) FROM latestTransactions") + validatedTransactionCount = (await cursor_latest_cache.fetchone())[0] + + # Construct response based on backend readiness + response_data = { + "systemAddressCount": tokenAddressCount, + "systemBlockCount": validatedBlockCount, + "systemTransactionCount": validatedTransactionCount, + "systemSmartContractCount": contractCount, + "systemTokenCount": tokenCount, + "lastscannedblock": lastscannedblock + } + + if not is_backend_ready(): + response_data["warning"] = BACKEND_NOT_READY_WARNING + return jsonify(response_data), 206 + else: + return jsonify(response_data), 200 + + except Exception as e: + print("info:", e) + return jsonify(description=INTERNAL_ERROR), 500 + + + + + +@app.route('/api/v2/broadcastTx/') +async def broadcastTx_v2(raw_transaction_hash): + try: + p1 = subprocess.run(['flo-cli',f"-datadir={FLO_DATA_DIR}",'sendrawtransaction',raw_transaction_hash], capture_output=True) + return jsonify(args=p1.args,returncode=p1.returncode,stdout=p1.stdout.decode(),stderr=p1.stderr.decode()), 200 + + + except Exception as e: + print("broadcastTx_v2:", e) + return jsonify(description=INTERNAL_ERROR), 500 + +@app.route('/api/v2/tokenList', methods=['GET']) +async def tokenList(): + try: + # Initialize token list + token_list = [] + + # Prefix and suffix for token databases + database_prefix = f"{mysql_config.database_prefix}_" + database_suffix = "_db" + + # Connect to the MySQL information schema asynchronously + async with await get_mysql_connection("information_schema", no_standardize=True, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + try: + # Query to fetch all databases matching the prefix and suffix + await cursor.execute( + f"SELECT SCHEMA_NAME FROM SCHEMATA WHERE SCHEMA_NAME LIKE '{database_prefix}%' AND SCHEMA_NAME LIKE '%{database_suffix}'" + ) + all_databases = [row[0] for row in await cursor.fetchall()] + + # Filter token databases and exclude smart contract databases + for db_name in all_databases: + stripped_name = db_name[len(database_prefix):-len(database_suffix)] + + # Exclude "latestCache" and "system" databases + if stripped_name in ["latestCache", "system"]: + continue + + parts = stripped_name.split('_') + + # Include token databases and exclude smart contract databases + if len(parts) == 1: # Token databases have a single part (e.g., usd, inr) + token_list.append(stripped_name) + elif len(parts) == 2 and len(parts[1]) == 34 and parts[1].startswith('F'): + # Smart contract databases are excluded + continue + + except Exception as e: + print(f"Error querying databases: {e}") + return jsonify(description="Error querying databases"), 500 + + # Return response with backend readiness check + if not is_backend_ready(): + return jsonify(warning=BACKEND_NOT_READY_WARNING, tokens=token_list), 206 + else: + return jsonify(tokens=token_list), 200 + + except Exception as e: + print(f"tokenList:", e) + return jsonify(description=INTERNAL_ERROR), 500 + + + + + +@app.route('/api/v2/tokenInfo/', methods=['GET']) +async def tokenInfo(token): + try: + if token is None: + return jsonify(description='Token name has not been passed'), 400 + + # Standardize database name for the token + token_db_name = standardize_db_name(token) + + try: + # Connect to the token database asynchronously + async with await get_mysql_connection(token_db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Fetch incorporation data + await cursor.execute('SELECT * FROM transactionHistory WHERE id=1') + incorporationRow = await cursor.fetchone() + + # Fetch distinct active addresses count + await cursor.execute('SELECT COUNT(DISTINCT address) FROM activeTable') + numberOf_distinctAddresses = (await cursor.fetchone())[0] + + # Fetch total number of transactions + await cursor.execute('SELECT MAX(id) FROM transactionHistory') + numberOf_transactions = (await cursor.fetchone())[0] + + # Fetch associated contracts + await cursor.execute(''' + SELECT contractName, contractAddress, blockNumber, blockHash, transactionHash + FROM tokenContractAssociation + ''') + associatedContracts = await cursor.fetchall() + + # Process associated contracts + associatedContractList = [ + { + 'contractName': item[0], + 'contractAddress': item[1], + 'blockNumber': item[2], + 'blockHash': item[3], + 'transactionHash': item[4], + } + for item in associatedContracts + ] + + # Prepare the response + response = { + 'token': token, + 'incorporationAddress': incorporationRow[1], + 'tokenSupply': incorporationRow[3], + 'time': incorporationRow[6], + 'blockchainReference': incorporationRow[7], + 'activeAddress_no': numberOf_distinctAddresses, + 'totalTransactions': numberOf_transactions, + 'associatedSmartContracts': associatedContractList, + } + + if not is_backend_ready(): + return jsonify(warning=BACKEND_NOT_READY_WARNING, **response), 206 + else: + return jsonify(**response), 200 + + except aiomysql.MySQLError: + if not is_backend_ready(): + return jsonify(description=BACKEND_NOT_READY_ERROR), 503 + else: + return jsonify(description="Token database doesn't exist"), 404 + + except Exception as e: + print("tokenInfo:", e) + return jsonify(description=INTERNAL_ERROR), 500 + + +@app.route('/api/v2/tokenTransactions/', methods=['GET']) +async def tokenTransactions(token): + try: + if token is None: + return jsonify(description='Token name has not been passed'), 400 + + # Input validations + senderFloAddress = request.args.get('senderFloAddress') + if senderFloAddress is not None and not check_flo_address(senderFloAddress, is_testnet): + return jsonify(description='senderFloAddress validation failed'), 400 + + destFloAddress = request.args.get('destFloAddress') + if destFloAddress is not None and not check_flo_address(destFloAddress, is_testnet): + return jsonify(description='destFloAddress validation failed'), 400 + + limit = request.args.get('limit') + if limit is not None and not check_integer(limit): + return jsonify(description='limit validation failed'), 400 + + use_AND = request.args.get('use_AND') + if use_AND is not None and use_AND not in ['True', 'False']: + return jsonify(description='use_AND validation failed'), 400 + + _from = int(request.args.get('_from', 1)) # Page number, default is 1 + to = int(request.args.get('to', 100)) # Number of items per page, default is 100 + + if _from < 1: + return jsonify(description='_from validation failed'), 400 + if to < 1: + return jsonify(description='to validation failed'), 400 + + # Construct token database name + token_db_name = standardize_db_name(token) + + # Connect to the token database and execute the query asynchronously + async with await get_mysql_connection(token_db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Build the query dynamically + query = "SELECT jsonData, parsedFloData FROM transactionHistory WHERE 1=1" + params = [] + + if senderFloAddress: + query += " AND sourceFloAddress = %s" + params.append(senderFloAddress) + if destFloAddress: + query += " AND destFloAddress = %s" + params.append(destFloAddress) + if use_AND == 'True' and senderFloAddress and destFloAddress: + query += " AND sourceFloAddress = %s AND destFloAddress = %s" + params.extend([senderFloAddress, destFloAddress]) + + query += " ORDER BY id DESC LIMIT %s OFFSET %s" + params.extend([to, (_from - 1) * to]) + + # Execute the query asynchronously + await cursor.execute(query, tuple(params)) + transactionJsonData = await cursor.fetchall() + + # Process and format transactions asynchronously + sortedFormattedTransactions = [] + for row in transactionJsonData: + transactions_object = {} + transactions_object['transactionDetails'] = json.loads(row[0]) + transactions_object['transactionDetails'] = await update_transaction_confirmations(transactions_object['transactionDetails']) + transactions_object['parsedFloData'] = json.loads(row[1]) + sortedFormattedTransactions.append(transactions_object) + + # Prepare response + if not is_backend_ready(): + return jsonify(warning=BACKEND_NOT_READY_WARNING, token=token, transactions=sortedFormattedTransactions), 206 + else: + return jsonify(token=token, transactions=sortedFormattedTransactions), 200 + + except aiomysql.MySQLError as e: + print(f"Database error in tokenTransactions: {e}") + return jsonify(description="Database error occurred"), 500 + except Exception as e: + print("tokenTransactions:", e) + return jsonify(description=INTERNAL_ERROR), 500 + + +@app.route('/api/v2/tokenBalances/', methods=['GET']) +async def tokenBalances(token): + try: + # Validate the token parameter + if not token: + return jsonify(description="Token name has not been passed"), 400 + + # Standardize the token database name + token_db_name = standardize_db_name(token) + + try: + # Establish async connection to the token database + async with await get_mysql_connection(token_db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Fetch balances grouped by address + query = "SELECT address, SUM(transferBalance) FROM activeTable GROUP BY address" + await cursor.execute(query) + addressBalances = await cursor.fetchall() + + # Create the return dictionary + returnList = {address: balance for address, balance in addressBalances} + + except aiomysql.MySQLError as e: + print(f"Database error while fetching balances for token {token}: {e}") + if not is_backend_ready(): + return jsonify(description=BACKEND_NOT_READY_ERROR), 503 + else: + return jsonify(description="Token database doesn't exist"), 404 + + # Prepare and return the response + if not is_backend_ready(): + return jsonify( + warning=BACKEND_NOT_READY_WARNING, + token=token, + balances=returnList + ), 206 + else: + return jsonify( + token=token, + balances=returnList + ), 200 + + except Exception as e: + print("tokenBalances:", e) + return jsonify(description=INTERNAL_ERROR), 500 + + +@app.route('/api/v2/floAddressInfo/', methods=['GET']) +async def floAddressInfo(floAddress): + try: + if not floAddress: + return jsonify(description="floAddress hasn't been passed"), 400 + + # Validate the FLO address + if not check_flo_address(floAddress, is_testnet): + return jsonify(description="floAddress validation failed"), 400 + + detail_list = {} + incorporated_smart_contracts = [] + + # Connect to the system database to fetch associated tokens and contracts + async with await get_mysql_connection(standardize_db_name("system"), USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Fetch tokens associated with the FLO address + await cursor.execute("SELECT DISTINCT token FROM tokenAddressMapping WHERE tokenAddress = %s", (floAddress,)) + token_names = [row[0] for row in await cursor.fetchall()] + + # Fetch incorporated smart contracts + smart_contract_query = """ + SELECT contractName, status, tokenIdentification, contractType, transactionHash, blockNumber, blockHash + FROM activeContracts + WHERE contractAddress = %s + """ + await cursor.execute(smart_contract_query, (floAddress,)) + incorporated_contracts = await cursor.fetchall() + + # Process token balances concurrently + if token_names: + tasks = [fetch_token_balance(token, floAddress) for token in token_names] + balances = await asyncio.gather(*tasks, return_exceptions=True) + + for token, balance_result in zip(token_names, balances): + if isinstance(balance_result, Exception): + print(f"Error fetching balance for token {token}: {balance_result}") + else: + detail_list.update(balance_result) + + # Process incorporated contracts + for contract in incorporated_contracts: + incorporated_smart_contracts.append({ + 'contractName': contract[0], + 'contractAddress': floAddress, + 'status': contract[1], + 'tokenIdentification': contract[2], + 'contractType': contract[3], + 'transactionHash': contract[4], + 'blockNumber': contract[5], + 'blockHash': contract[6], + }) + + # Prepare the response + response = { + 'floAddress': floAddress, + 'floAddressBalances': detail_list or None, + 'incorporatedSmartContracts': incorporated_smart_contracts or None + } + + if not is_backend_ready(): + response['warning'] = BACKEND_NOT_READY_WARNING + return jsonify(response), 206 + else: + return jsonify(response), 200 + + except Exception as e: + print(f"floAddressInfo: {e}") + return jsonify(description="Unexpected error occurred"), 500 + + +@app.route('/api/v2/floAddressBalance/', methods=['GET']) +async def floAddressBalance(floAddress): + try: + if not floAddress: + return jsonify(description="floAddress hasn't been passed"), 400 + + # Validate the FLO address + if not check_flo_address(floAddress, is_testnet): + return jsonify(description="floAddress validation failed"), 400 + + token = request.args.get('token') + + # Case 1: Fetch balances for all associated tokens + if not token: + async with await get_mysql_connection(standardize_db_name("system"), USE_ASYNC=True) as system_conn: + async with system_conn.cursor() as cursor: + # Fetch tokens associated with the FLO address + query = "SELECT DISTINCT token FROM tokenAddressMapping WHERE tokenAddress = %s" + await cursor.execute(query, (floAddress,)) + token_names = [row[0] for row in await cursor.fetchall()] + + if not token_names: + return jsonify(description="No tokens associated with the FLO address"), 404 + + # Fetch balances for each token concurrently + tasks = [fetch_token_balance(token_name, floAddress) for token_name in token_names] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Aggregate balances + detail_list = {} + for token_name, result in zip(token_names, results): + if isinstance(result, Exception): + print(f"Error fetching balance for token {token_name}: {result}") + else: + detail_list.update(result) + + # Prepare the response + response = { + 'floAddress': floAddress, + 'floAddressBalances': detail_list + } + if not is_backend_ready(): + response['warning'] = BACKEND_NOT_READY_WARNING + return jsonify(response), 206 + return jsonify(response), 200 + + # Case 2: Fetch balance for a specific token + else: + result = await fetch_token_balance(token, floAddress) + balance = result.get(token, {}).get('balance', 0) + + response = { + 'floAddress': floAddress, + 'token': token, + 'balance': balance + } + if not is_backend_ready(): + response['warning'] = BACKEND_NOT_READY_WARNING + return jsonify(response), 206 + return jsonify(response), 200 + + except aiomysql.MySQLError as e: + print(f"Database error in floAddressBalance: {e}") + return jsonify(description="Database error occurred"), 500 + except Exception as e: + print(f"floAddressBalance: {e}") + return jsonify(description="Unexpected error occurred"), 500 + + +@app.route('/api/v2/floAddressTransactions/', methods=['GET']) +async def floAddressTransactions(floAddress): + try: + # Validate input + if floAddress is None: + return jsonify(description='floAddress has not been passed'), 400 + if not check_flo_address(floAddress, is_testnet): + return jsonify(description='floAddress validation failed'), 400 + + # Validate limit + limit = request.args.get('limit') + if limit is not None and not check_integer(limit): + return jsonify(description='limit validation failed'), 400 + + # Get optional token filter + token = request.args.get('token') + all_transaction_list = [] + + if token is None: + try: + # Fetch associated tokens for the FLO address + async with await get_mysql_connection(standardize_db_name("system"), USE_ASYNC=True) as system_conn: + async with system_conn.cursor() as cursor: + query = "SELECT DISTINCT token FROM tokenAddressMapping WHERE tokenAddress = %s" + await cursor.execute(query, (floAddress,)) + token_names = [row[0] for row in await cursor.fetchall()] + + # If no tokens found, return 404 + if not token_names: + return jsonify(description="No tokens associated with the FLO address"), 404 + + # Fetch transactions for all associated tokens concurrently + tasks = [ + fetch_token_transactions(token_name, floAddress, floAddress, limit) + for token_name in token_names + ] + transaction_data = await asyncio.gather(*tasks) + + # Aggregate transactions from all tokens + for data in transaction_data: + all_transaction_list.extend(data) + + except aiomysql.MySQLError as e: + print(f"Database error while fetching tokens: {e}") + return jsonify(description="Database error occurred"), 500 + + else: + try: + # Fetch transactions for the specified token + all_transaction_list = await fetch_token_transactions( + token, floAddress, floAddress, limit + ) + + except aiomysql.MySQLError as e: + print(f"Error accessing token database {token}: {e}") + return jsonify(description="Token database error occurred"), 500 + + # Sort and format transactions + sorted_formatted_transactions = sort_transactions(all_transaction_list) + + # Prepare response + response = { + "floAddress": floAddress, + "transactions": sorted_formatted_transactions + } + if token: + response["token"] = token + if not is_backend_ready(): + response["warning"] = BACKEND_NOT_READY_WARNING + return jsonify(response), 206 + return jsonify(response), 200 + + except Exception as e: + print("floAddressTransactions:", e) + return jsonify(description="Unexpected error occurred"), 500 + + + + + + +# SMART CONTRACT APIs +@app.route('/api/v2/smartContractList', methods=['GET']) +async def getContractList_v2(): + try: + # Retrieve and validate query parameters + contractName = request.args.get('contractName') + if contractName: + contractName = contractName.strip().lower() + + contractAddress = request.args.get('contractAddress') + if contractAddress: + contractAddress = contractAddress.strip() + if not check_flo_address(contractAddress, is_testnet): + return jsonify(description="contractAddress validation failed"), 400 + + # Standardize the system database name + system_db_name = standardize_db_name("system") + + # Initialize contract list + smart_contracts_morphed = [] + + # Fetch contracts from the database asynchronously + async with await get_mysql_connection(system_db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Build query dynamically + query = "SELECT * FROM activecontracts" + conditions = [] + params = [] + + if contractName: + conditions.append("contractName=%s") + params.append(contractName) + + if contractAddress: + conditions.append("contractAddress=%s") + params.append(contractAddress) + + if conditions: + query += " WHERE " + " AND ".join(conditions) + + # Execute query and fetch results + await cursor.execute(query, tuple(params)) + smart_contracts = await cursor.fetchall() + + # Morph the smart contract data + smart_contracts_morphed = await smartcontract_morph_helper(smart_contracts) + + # Fetch the committee address list asynchronously + committeeAddressList = await refresh_committee_list(APP_ADMIN, apiUrl, int(time.time())) + + # Prepare the response + response = { + "smartContracts": smart_contracts_morphed, + "smartContractCommittee": committeeAddressList, + } + + if not is_backend_ready(): + response["warning"] = BACKEND_NOT_READY_WARNING + return jsonify(response), 206 + + return jsonify(response), 200 + + except Exception as e: + print("getContractList_v2:", e) + return jsonify(description="Unexpected error occurred"), 500 + + + +@app.route('/api/v2/getSmartContractInfo', methods=['GET'], endpoint='getSmartContractInfoV2') +async def getContractInfo(): + try: + # Validate query parameters + contractName = request.args.get('contractName') + contractAddress = request.args.get('contractAddress') + + if not contractName: + return jsonify(result='error', description="Smart Contract's name hasn't been passed"), 400 + if not contractAddress: + return jsonify(result='error', description="Smart Contract's address hasn't been passed"), 400 + + # Standardize the database names + contract_db_name = standardize_db_name(f"{contractName}_{contractAddress}") + system_db_name = standardize_db_name("system") + + # Initialize the response structure + contract_info = {} + + # Fetch contract structure and participants/token details + async with await get_mysql_connection(contract_db_name, USE_ASYNC=True) as conn_contract: + async with conn_contract.cursor() as cursor: + # Fetch contract structure + await cursor.execute("SELECT attribute, value FROM contractstructure") + results = await cursor.fetchall() + + exit_conditions = {} + for idx, (attribute, value) in enumerate(results): + if attribute == 'exitconditions': + exit_conditions[idx] = value + else: + contract_info[attribute] = value + + if exit_conditions: + contract_info['userChoice'] = exit_conditions + + # Fetch participant details + await cursor.execute("SELECT COUNT(participantAddress) FROM contractparticipants") + contract_info['numberOfParticipants'] = (await cursor.fetchone())[0] + + await cursor.execute("SELECT SUM(tokenAmount) FROM contractparticipants") + contract_info['tokenAmountDeposited'] = (await cursor.fetchone())[0] + + # Fetch system-level contract details + async with await get_mysql_connection(system_db_name, USE_ASYNC=True) as conn_system: + async with conn_system.cursor() as cursor: + query = """ + SELECT status, incorporationDate, expiryDate, closeDate + FROM activecontracts + WHERE contractName = %s AND contractAddress = %s + """ + await cursor.execute(query, (contractName, contractAddress)) + system_results = await cursor.fetchone() + + if system_results: + contract_info.update({ + 'status': system_results[0], + 'incorporationDate': system_results[1], + 'expiryDate': system_results[2], + 'closeDate': system_results[3] + }) + + # Handle additional logic for closed contracts + if contract_info.get('status') == 'closed' and contract_info.get('contractType') == 'one-time-event': + async with await get_mysql_connection(contract_db_name, USE_ASYNC=True) as conn_contract: + async with conn_contract.cursor() as cursor: + await cursor.execute(""" + SELECT transactionType, transactionSubType + FROM contractTransactionHistory + WHERE transactionType = 'trigger' + """) + triggers = await cursor.fetchall() + + if len(triggers) == 1: + trigger_type = triggers[0][1] + contract_info['triggerType'] = trigger_type + + # Fetch winning details if user choices exist + if 'userChoice' in contract_info: + if trigger_type is None: + await cursor.execute(""" + SELECT userChoice + FROM contractparticipants + WHERE winningAmount IS NOT NULL + LIMIT 1 + """) + contract_info['winningChoice'] = (await cursor.fetchone())[0] + + await cursor.execute(""" + SELECT participantAddress, winningAmount + FROM contractparticipants + WHERE winningAmount IS NOT NULL + """) + winners = await cursor.fetchall() + contract_info['numberOfWinners'] = len(winners) + else: + return jsonify(result='error', description="Data integrity issue: multiple triggers found"), 500 + + # Final response + response = { + 'result': 'ok', + 'contractName': contractName, + 'contractAddress': contractAddress, + 'contractInfo': contract_info + } + + if not is_backend_ready(): + response['warning'] = BACKEND_NOT_READY_WARNING + return jsonify(response), 206 + + return jsonify(response), 200 + + except Exception as e: + print("getContractInfo:", e) + return jsonify(result='error', description="Unexpected error occurred"), 500 + + +@app.route('/api/v2/smartContractParticipants', methods=['GET']) +async def getcontractparticipants_v2(): + try: + # Validate query parameters + contractName = request.args.get('contractName') + contractAddress = request.args.get('contractAddress') + + if not contractName: + return jsonify(description="Smart Contract's name hasn't been passed"), 400 + if not contractAddress: + return jsonify(description="Smart Contract's address hasn't been passed"), 400 + if not check_flo_address(contractAddress, is_testnet): + return jsonify(description="contractAddress validation failed"), 400 + + # Standardize database name + contractName = contractName.strip().lower() + contractAddress = contractAddress.strip() + db_name = standardize_db_name(f"{contractName}_{contractAddress}") + + # Fetch contract structure and status + contractStructure = await fetchContractStructure(contractName, contractAddress) + contractStatus = await fetchContractStatus(contractName, contractAddress) + + # Initialize the response + participantInfo = [] + + # Async connection to the contract database + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + + # Handle external-trigger contracts + if 'exitconditions' in contractStructure: + token = contractStructure.get('tokenIdentification', None) + if contractStatus == 'closed': + query = ''' + SELECT id, participantAddress, tokenAmount, userChoice, transactionHash + FROM contractparticipants + ''' + await cursor.execute(query) + participants = await cursor.fetchall() + + for row in participants: + await cursor.execute( + 'SELECT winningAmount FROM contractwinners WHERE referenceTxHash = %s', + (row[4],) + ) + winningAmount = (await cursor.fetchone() or [0])[0] + participantInfo.append({ + 'participantFloAddress': row[1], + 'tokenAmount': row[2], + 'userChoice': row[3], + 'transactionHash': row[4], + 'winningAmount': winningAmount, + 'tokenIdentification': token + }) + else: + query = ''' + SELECT id, participantAddress, tokenAmount, userChoice, transactionHash + FROM contractparticipants + ''' + await cursor.execute(query) + participants = await cursor.fetchall() + + for row in participants: + participantInfo.append({ + 'participantFloAddress': row[1], + 'tokenAmount': row[2], + 'userChoice': row[3], + 'transactionHash': row[4] + }) + + contractSubtype = 'external-trigger' + + # Handle time-trigger contracts + elif 'payeeAddress' in contractStructure: + query = ''' + SELECT id, participantAddress, tokenAmount, transactionHash + FROM contractparticipants + ''' + await cursor.execute(query) + participants = await cursor.fetchall() + + for row in participants: + participantInfo.append({ + 'participantFloAddress': row[1], + 'tokenAmount': row[2], + 'transactionHash': row[3] + }) + + contractSubtype = 'time-trigger' + + # Handle tokenswap contracts + elif contractStructure['contractType'] == 'continuos-event' and contractStructure['subtype'] == 'tokenswap': + query = ''' + SELECT id, participantAddress, participationAmount, swapPrice, transactionHash, blockNumber, blockHash, swapAmount + FROM contractparticipants + ''' + await cursor.execute(query) + participants = await cursor.fetchall() + + for row in participants: + participantInfo.append({ + 'participantFloAddress': row[1], + 'participationAmount': row[2], + 'swapPrice': float(row[3]), + 'transactionHash': row[4], + 'blockNumber': row[5], + 'blockHash': row[6], + 'swapAmount': row[7] + }) + + contractSubtype = 'tokenswap' + + else: + return jsonify(description="Unsupported contract type"), 400 + + # Prepare the response + response = { + 'contractName': contractName, + 'contractAddress': contractAddress, + 'contractType': contractStructure['contractType'], + 'contractSubtype': contractSubtype, + 'participantInfo': participantInfo + } + + if not is_backend_ready(): + response['warning'] = BACKEND_NOT_READY_WARNING + return jsonify(response), 206 + else: + return jsonify(response), 200 + + except Exception as e: + print("getcontractparticipants_v2:", e) + return jsonify(description="Unexpected error occurred"), 500 + + + + +@app.route('/api/v2/getParticipantDetails', methods=['GET'], endpoint='getParticipantDetailsV2') +async def getParticipantDetails(): + try: + floAddress = request.args.get('floAddress') + contractName = request.args.get('contractName') + contractAddress = request.args.get('contractAddress') + + # Validate input parameters + if not floAddress: + return jsonify(result='error', description='FLO address hasn\'t been passed'), 400 + + if (contractName and not contractAddress) or (contractAddress and not contractName): + return jsonify(result='error', description='Pass both, contractName and contractAddress as URL parameters'), 400 + + floAddress = floAddress.strip() + if contractName: + contractName = contractName.strip().lower() + if contractAddress: + contractAddress = contractAddress.strip() + + # Standardize database names + system_db_name = standardize_db_name("system") + contract_db_name = standardize_db_name(f"{contractName}_{contractAddress}") if contractName and contractAddress else None + + participationDetailsList = [] + + # Connect to the system database asynchronously + async with await get_mysql_connection(system_db_name, USE_ASYNC=True) as conn_system: + async with conn_system.cursor() as cursor_system: + # Build the query dynamically + if contractName and contractAddress: + query = ''' + SELECT * + FROM contractAddressMapping + WHERE address = %s AND addressType = "participant" AND contractName = %s AND contractAddress = %s + ''' + params = (floAddress, contractName, contractAddress) + else: + query = ''' + SELECT * + FROM contractAddressMapping + WHERE address = %s AND addressType = "participant" + ''' + params = (floAddress,) + + # Execute the query asynchronously + await cursor_system.execute(query, params) + participant_address_contracts = await cursor_system.fetchall() + + if not participant_address_contracts: + if not is_backend_ready(): + return jsonify(result='error', description=BACKEND_NOT_READY_ERROR), 503 + else: + return jsonify(result='error', description='Address hasn\'t participated in any contract'), 404 + + # Process each contract the address has participated in + for contract in participant_address_contracts: + detailsDict = {} + contract_name = contract[3] + contract_address = contract[4] + contract_db_name = standardize_db_name(f"{contract_name}_{contract_address}") + + # Fetch contract structure asynchronously + contractStructure = await fetchContractStructure(contract_name, contract_address) + + # Async connection to the contract database + async with await get_mysql_connection(contract_db_name, USE_ASYNC=True) as conn_contract: + async with conn_contract.cursor() as cursor_contract: + if contractStructure['contractType'] == 'tokenswap': + # Fetch participant details for tokenswap contracts + query = ''' + SELECT participantAddress, participationAmount, receivedAmount, transactionHash, blockNumber, blockHash + FROM contractparticipants + WHERE participantAddress = %s + ''' + await cursor_contract.execute(query, (floAddress,)) + participation_details = await cursor_contract.fetchall() + + participationList = [] + for row in participation_details: + participationList.append({ + 'participationAddress': floAddress, + 'participationAmount': row[1], + 'receivedAmount': row[2], + 'transactionHash': row[3], + 'blockNumber': row[4], + 'blockHash': row[5] + }) + + detailsDict['contractName'] = contract_name + detailsDict['contractAddress'] = contract_address + detailsDict['participationDetails'] = participationList + + elif contractStructure['contractType'] == 'one-time-event' and 'payeeAddress' in contractStructure: + # Fetch participant details for one-time-event contracts + query = ''' + SELECT tokenAmount, transactionHash + FROM contractparticipants + WHERE participantAddress = %s + ''' + await cursor_contract.execute(query, (floAddress,)) + result = await cursor_contract.fetchone() + + detailsDict['contractName'] = contract_name + detailsDict['contractAddress'] = contract_address + detailsDict['tokenAmount'] = result[0] + detailsDict['transactionHash'] = result[1] + + # Add more contract type logic here if needed + + participationDetailsList.append(detailsDict) + + # Finalize the response + if not is_backend_ready(): + return jsonify( + result='ok', + warning=BACKEND_NOT_READY_WARNING, + floAddress=floAddress, + type='participant', + participatedContracts=participationDetailsList + ), 206 + else: + return jsonify( + result='ok', + floAddress=floAddress, + type='participant', + participatedContracts=participationDetailsList + ), 200 + + except aiomysql.MySQLError as db_error: + print(f"Database error: {db_error}") + return jsonify(result='error', description="Database error occurred"), 500 + except Exception as e: + print(f"getParticipantDetails: {e}") + return jsonify(result='error', description="Unexpected error occurred"), 500 + + + + + +@app.route('/api/v2/smartContractTransactions', methods=['GET']) +async def smartcontracttransactions(): + try: + # Get and validate query parameters + contractName = request.args.get('contractName') + if not contractName: + return jsonify(description="Smart Contract's name hasn't been passed"), 400 + contractName = contractName.strip().lower() + + contractAddress = request.args.get('contractAddress') + if not contractAddress: + return jsonify(description="Smart Contract's address hasn't been passed"), 400 + contractAddress = contractAddress.strip() + if not check_flo_address(contractAddress, is_testnet): + return jsonify(description="contractAddress validation failed"), 400 + + _from = int(request.args.get('_from', 1)) # Page number, default is 1 + to = int(request.args.get('to', 100)) # Limit per page, default is 100 + + if _from < 1: + return jsonify(description="_from validation failed"), 400 + if to < 1: + return jsonify(description="to validation failed"), 400 + + # Standardize the contract database name + contractDbName = standardize_db_name(f"{contractName}_{contractAddress}") + + # Check if the smart contract database exists + async with await get_mysql_connection("information_schema", no_standardize=True, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + query = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = %s" + await cursor.execute(query, (contractDbName,)) + db_exists = await cursor.fetchone() + + if not db_exists: + # Handle missing smart contract database + if not is_backend_ready(): + return jsonify(description=BACKEND_NOT_READY_ERROR), 503 + else: + return jsonify(description="Smart Contract with the given name doesn't exist"), 404 + + # Fetch transaction data for the smart contract + transactionJsonData = await fetch_contract_transactions(contractName, contractAddress, _from, to) + transactionJsonData = sort_transactions(transactionJsonData) + + # Construct response + response_data = { + "contractName": contractName, + "contractAddress": contractAddress, + "contractTransactions": transactionJsonData + } + + if not is_backend_ready(): + response_data["warning"] = BACKEND_NOT_READY_WARNING + return jsonify(response_data), 206 + else: + return jsonify(response_data), 200 + + except ValueError as ve: + # Handle invalid input for _from or to + print(f"Value error in smartcontracttransactions: {ve}") + return jsonify(description="Invalid input for _from or to"), 400 + + except Exception as e: + # General error handling + print(f"smartcontracttransactions: {e}") + return jsonify(description=INTERNAL_ERROR), 500 + + + + + +# todo - add options to only ask for active/consumed/returned deposits +@app.route('/api/v2/smartContractDeposits', methods=['GET']) +async def smartcontractdeposits(): + try: + # Validate input parameters + contractName = request.args.get('contractName') + if not contractName: + return jsonify(description="Smart Contract's name hasn't been passed"), 400 + contractName = contractName.strip().lower() + + contractAddress = request.args.get('contractAddress') + if not contractAddress: + return jsonify(description="Smart Contract's address hasn't been passed"), 400 + contractAddress = contractAddress.strip() + if not check_flo_address(contractAddress, is_testnet): + return jsonify(description="contractAddress validation failed"), 400 + + # Standardize the database name + db_name = standardize_db_name(f"{contractName}_{contractAddress}") + + # Connect to the smart contract database asynchronously + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Fetch distinct deposits with the latest balances + distinct_deposits_query = """ + SELECT depositorAddress, transactionHash, status, depositBalance + FROM contractdeposits + WHERE (transactionHash, id) IN ( + SELECT transactionHash, MAX(id) + FROM contractdeposits + GROUP BY transactionHash + ) + ORDER BY id DESC; + """ + await cursor.execute(distinct_deposits_query) + distinct_deposits = await cursor.fetchall() + + deposit_info = [] + for deposit in distinct_deposits: + depositor_address, transaction_hash, status, current_balance = deposit + + # Fetch the original deposit balance and expiry time asynchronously + original_deposit_query = """ + SELECT depositBalance, unix_expiryTime + FROM contractdeposits + WHERE transactionHash = %s + ORDER BY id + LIMIT 1; + """ + await cursor.execute(original_deposit_query, (transaction_hash,)) + original_deposit = await cursor.fetchone() + + if original_deposit: + original_balance, expiry_time = original_deposit + deposit_info.append({ + 'depositorAddress': depositor_address, + 'transactionHash': transaction_hash, + 'status': status, + 'originalBalance': original_balance, + 'currentBalance': current_balance, + 'time': expiry_time + }) + + # Fetch the current total deposit balance + total_deposit_balance_query = """ + SELECT SUM(depositBalance) AS totalDepositBalance + FROM contractdeposits c1 + WHERE id = ( + SELECT MAX(id) + FROM contractdeposits c2 + WHERE c1.transactionHash = c2.transactionHash + ); + """ + await cursor.execute(total_deposit_balance_query) + current_deposit_balance = await cursor.fetchone() + + # Prepare the response + response = { + 'currentDepositBalance': current_deposit_balance[0] if current_deposit_balance else 0, + 'depositInfo': deposit_info + } + + # Return response based on backend readiness + if not is_backend_ready(): + response['warning'] = BACKEND_NOT_READY_WARNING + return jsonify(response), 206 + else: + return jsonify(response), 200 + + except aiomysql.MySQLError as db_error: + print(f"Database error in smartcontractdeposits: {db_error}") + return jsonify(description="Database error occurred"), 500 + except Exception as e: + print(f"smartcontractdeposits: {e}") + return jsonify(description=INTERNAL_ERROR), 500 + + + + +@app.route('/api/v2/blockDetails/', methods=['GET']) +async def blockdetails(blockHash): + try: + # todo - validate blockHash + blockJson = await blockdetailhelper(blockHash) + if len(blockJson) != 0: + blockJson = json.loads(blockJson[0][0]) + return jsonify(blockDetails=blockJson), 200 + else: + if not is_backend_ready(): + return jsonify(description=BACKEND_NOT_READY_ERROR), 503 + else: + return jsonify(description='Block doesn\'t exist in database'), 404 + except Exception as e: + print("blockdetails:", e) + return jsonify(description=INTERNAL_ERROR), 500 + + + +@app.route('/api/v2/transactionDetails/', methods=['GET']) +async def transactiondetails1(transactionHash): + try: + # Fetch transaction details using the helper + transactionJsonData = await transactiondetailhelper(transactionHash) + + if transactionJsonData: + transactionJson = json.loads(transactionJsonData[0][0]) + transactionJson = await update_transaction_confirmations(transactionJson) + parseResult = json.loads(transactionJsonData[0][1]) + operation = transactionJsonData[0][2] + db_reference = transactionJsonData[0][3] + + sender_address, receiver_address = extract_ip_op_addresses(transactionJson) + mergeTx = {**parseResult, **transactionJson} + mergeTx['onChain'] = True + operationDetails = {} + + # Standardize database name + db_name = standardize_db_name(db_reference) + + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + if operation == 'smartContractDeposit': + # Handle smart contract deposits + await cursor.execute(""" + SELECT depositAmount, blockNumber + FROM contractdeposits + WHERE status = 'deposit-return' AND transactionHash = %s + """, (transactionJson['txid'],)) + returned_deposit_tx = await cursor.fetchall() + if returned_deposit_tx: + operationDetails['returned_depositAmount'] = returned_deposit_tx[0][0] + operationDetails['returned_blockNumber'] = returned_deposit_tx[0][1] + + await cursor.execute(""" + SELECT depositAmount, blockNumber + FROM contractdeposits + WHERE status = 'deposit-honor' AND transactionHash = %s + """, (transactionJson['txid'],)) + deposit_honors = await cursor.fetchall() + operationDetails['depositHonors'] = { + 'list': [{'honor_amount': honor[0], 'blockNumber': honor[1]} for honor in deposit_honors], + 'count': len(deposit_honors) + } + + await cursor.execute(""" + SELECT depositBalance + FROM contractdeposits + WHERE id = (SELECT MAX(id) FROM contractdeposits WHERE transactionHash = %s) + """, (transactionJson['txid'],)) + depositBalance = await cursor.fetchone() + operationDetails['depositBalance'] = depositBalance[0] + operationDetails['consumedAmount'] = parseResult['depositAmount'] - operationDetails['depositBalance'] + + elif operation == 'tokenswap-participation': + # Handle token swap participation + await cursor.execute(""" + SELECT tokenAmount, winningAmount, userChoice + FROM contractparticipants + WHERE transactionHash = %s + """, (transactionJson['txid'],)) + swap_amounts = await cursor.fetchone() + + await cursor.execute("SELECT value FROM contractstructure WHERE attribute = 'selling_token'") + structure = await cursor.fetchone() + + operationDetails = { + 'participationAmount': swap_amounts[0], + 'receivedAmount': swap_amounts[1], + 'participationToken': parseResult['tokenIdentification'], + 'receivedToken': structure[0], + 'swapPrice_received_to_participation': float(swap_amounts[2]) + } + + elif operation == 'smartContractPays': + # Handle smart contract payouts + await cursor.execute(""" + SELECT participantAddress, tokenAmount, userChoice, winningAmount + FROM contractparticipants + WHERE winningAmount IS NOT NULL + """) + winner_participants = await cursor.fetchall() + operationDetails = { + 'total_winners': len(winner_participants), + 'winning_choice': winner_participants[0][2] if winner_participants else None, + 'winner_list': [ + { + 'participantAddress': participant[0], + 'participationAmount': participant[1], + 'winningAmount': participant[3] + } for participant in winner_participants + ] + } + + elif operation == 'ote-externaltrigger-participation': + # Handle external-trigger participation + await cursor.execute(""" + SELECT winningAmount + FROM contractparticipants + WHERE transactionHash = %s + """, (transactionHash,)) + winningAmount = await cursor.fetchone() + if winningAmount and winningAmount[0] is not None: + operationDetails['winningAmount'] = winningAmount[0] + + elif operation == 'tokenswapParticipation': + # Handle token swap participation + contractName, contractAddress = db_reference.rsplit('_', 1) + txhash_txs = await fetch_swap_contract_transactions(contractName, contractAddress, transactionHash) + mergeTx['subTransactions'] = [ + tx for tx in txhash_txs if not tx.get('onChain', True) + ] + + mergeTx['operation'] = operation + mergeTx['operationDetails'] = operationDetails + return jsonify(mergeTx), 200 + + else: + # Handle no transaction found + if not is_backend_ready(): + return jsonify(description=BACKEND_NOT_READY_ERROR), 503 + else: + return jsonify(description="Transaction doesn't exist in database"), 404 + + except Exception as e: + print(f"transactiondetails1: {e}") + return jsonify(description=INTERNAL_ERROR), 500 + + + +@app.route('/api/v2/latestTransactionDetails', methods=['GET']) +async def latestTransactionDetails(): + try: + # Validate the 'limit' parameter + limit = request.args.get('limit') + if limit is not None and not check_integer(limit): + return jsonify(description='limit validation failed'), 400 + + # Standardize the database name + db_name = standardize_db_name("latestCache") + + # Connect to the database asynchronously + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Build and execute the query + if limit is not None: + query = """ + SELECT * FROM latestTransactions + WHERE blockNumber IN ( + SELECT DISTINCT blockNumber + FROM latestTransactions + ORDER BY blockNumber DESC + LIMIT %s + ) + ORDER BY id DESC; + """ + await cursor.execute(query, (int(limit),)) + else: + query = """ + SELECT * FROM latestTransactions + WHERE blockNumber IN ( + SELECT DISTINCT blockNumber + FROM latestTransactions + ORDER BY blockNumber DESC + ) + ORDER BY id DESC; + """ + await cursor.execute(query) + + # Fetch and process transactions + latestTransactions = await cursor.fetchall() + tx_list = [] + for item in latestTransactions: + item = list(item) + tx_parsed_details = {} + + # Parse transaction details + tx_parsed_details['transactionDetails'] = json.loads(item[3]) + tx_parsed_details['transactionDetails'] = await update_transaction_confirmations(tx_parsed_details['transactionDetails']) + tx_parsed_details['parsedFloData'] = json.loads(item[5]) + tx_parsed_details['parsedFloData']['transactionType'] = item[4] + tx_parsed_details['transactionDetails']['blockheight'] = int(item[2]) + + # Merge parsed details + merged_tx = {**tx_parsed_details['transactionDetails'], **tx_parsed_details['parsedFloData']} + merged_tx['onChain'] = True + + # Append to the transaction list + tx_list.append(merged_tx) + + # Return response based on backend readiness + if not is_backend_ready(): + return jsonify(warning=BACKEND_NOT_READY_WARNING, latestTransactions=tx_list), 206 + else: + return jsonify(latestTransactions=tx_list), 200 + + except Exception as e: + print("latestTransactionDetails:", e) + return jsonify(description=INTERNAL_ERROR), 500 + + + + + + +@app.route('/api/v2/latestBlockDetails', methods=['GET']) +async def latestBlockDetails(): + try: + # Validate the 'limit' parameter + limit = request.args.get('limit') + if limit is not None and not check_integer(limit): + return jsonify(description='limit validation failed'), 400 + + # Standardize the database name + db_name = standardize_db_name("latestCache") + + # Connect to the database asynchronously + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Build the SQL query + if limit is None: + query = """ + SELECT jsonData + FROM ( + SELECT * + FROM latestBlocks + ORDER BY blockNumber DESC + LIMIT 4 + ) subquery + ORDER BY id DESC; + """ + await cursor.execute(query) + else: + query = """ + SELECT jsonData + FROM ( + SELECT * + FROM latestBlocks + ORDER BY blockNumber DESC + LIMIT %s + ) subquery + ORDER BY id DESC; + """ + await cursor.execute(query, (int(limit),)) + + # Fetch and parse the blocks + latestBlocks = await cursor.fetchall() + templst = [json.loads(item[0]) for item in latestBlocks] + + # Return response based on backend readiness + if not is_backend_ready(): + return jsonify(warning=BACKEND_NOT_READY_WARNING, latestBlocks=templst), 206 + else: + return jsonify(latestBlocks=templst), 200 + + except Exception as e: + print("latestBlockDetails:", e) + return jsonify(description=INTERNAL_ERROR), 500 + + +@app.route('/api/v2/blockTransactions/', methods=['GET']) +async def blocktransactions(blockHash): + try: + blockJson = await blockdetailhelper(blockHash) + if len(blockJson) != 0: + blockJson = json.loads(blockJson[0][0]) + blocktxlist = blockJson['txs'] + blocktxs = [] + for i in range(len(blocktxlist)): + temptx = await transactiondetailhelper(blocktxlist[i]['txid']) + transactionJson = json.loads(temptx[0][0]) + parseResult = json.loads(temptx[0][1]) + blocktxs.append({**parseResult , **transactionJson}) + + # TODO (CRITICAL): Write conditions to include and filter on chain and offchain transactions + #blocktxs['onChain'] = True + return jsonify(transactions=blocktxs, blockKeyword=blockHash), 200 + else: + if not is_backend_ready(): + return jsonify(description=BACKEND_NOT_READY_ERROR), 503 + else: + return jsonify(description='Block doesn\'t exist in database'), 404 + + + except Exception as e: + print("blocktransactions:", e) + return jsonify(description=INTERNAL_ERROR), 500 + + +@app.route('/api/v2/categoriseString/', methods=['GET']) +async def categoriseString_v2(urlstring): + try: + # Check if the string is a transaction or block hash + async with aiohttp.ClientSession() as session: + # Check if it's a transaction hash + async with session.get(f"{apiUrl}api/v1/tx/{urlstring}") as response: + if response.status == 200: + return jsonify(type='transaction'), 200 + + # Check if it's a block hash + async with session.get(f"{apiUrl}api/v1/block/{urlstring}") as response: + if response.status == 200: + return jsonify(type='block'), 200 + + # Check if the string is a token name or a smart contract name + db_name = standardize_db_name("system") + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Check if it's a token + query = """ + SELECT DISTINCT tokenName + FROM tokenInfo + WHERE LOWER(tokenName) = %s; + """ + await cursor.execute(query, (urlstring.lower(),)) + token_result = await cursor.fetchone() + if token_result: + return jsonify(type='token'), 200 + + # Check if it's a smart contract name + query = """ + SELECT DISTINCT contractName + FROM activeContracts + WHERE LOWER(contractName) = %s; + """ + await cursor.execute(query, (urlstring.lower(),)) + contract_result = await cursor.fetchone() + if contract_result: + return jsonify(type='smartContract'), 200 + + # If no match, classify as noise + return jsonify(type='noise'), 200 + + except Exception as e: + print("categoriseString_v2:", e) + return jsonify(description="Internal Error"), 500 + + + + +# Assuming `get_mysql_connection` has been updated to work with aiomysql and async + +@app.route('/api/v2/tokenSmartContractList', methods=['GET']) +async def tokenSmartContractList(): + try: + # Prefix for databases + database_prefix = f"{mysql_config.database_prefix}_" + database_suffix = "_db" + + # Initialize lists for tokens and contracts + token_list = [] + + # Step 1: Enumerate all databases asynchronously + async with await get_mysql_connection("information_schema", no_standardize=True, USE_ASYNC=True) as conn_info: + async with conn_info.cursor() as cursor: + query = f""" + SELECT SCHEMA_NAME + FROM SCHEMATA + WHERE SCHEMA_NAME LIKE '{database_prefix}%' + AND SCHEMA_NAME LIKE '%{database_suffix}' + """ + await cursor.execute(query) + all_databases = await cursor.fetchall() + + # Filter token databases from smart contract databases + for db_name in all_databases: + stripped_name = db_name[0][len(database_prefix):-len(database_suffix)] + + # Exclude "latestCache" and "system" databases + if stripped_name in ["latestCache", "system"]: + continue + + parts = stripped_name.split('_') + if len(parts) == 1: # Token database format (e.g., usd, inr) + token_list.append(stripped_name) + + # Step 2: Fetch smart contracts from the `system` database + async with await get_mysql_connection("system", USE_ASYNC=True) as conn_system: + async with conn_system.cursor() as cursor: + # Validate and process query parameters + contractName = request.args.get('contractName') + contractAddress = request.args.get('contractAddress') + + if contractName: + contractName = contractName.strip().lower() + if contractAddress: + contractAddress = contractAddress.strip() + if not check_flo_address(contractAddress, is_testnet): + return jsonify(description='contractAddress validation failed'), 400 + + # Fetch smart contracts matching the filters + query = """ + SELECT * + FROM activeContracts + WHERE (%s IS NULL OR LOWER(contractName) = %s) + AND (%s IS NULL OR contractAddress = %s) + """ + await cursor.execute(query, (contractName, contractName, contractAddress, contractAddress)) + smart_contracts = await cursor.fetchall() + + # Morph the smart contract data for response formatting + smart_contracts_morphed = await smartcontract_morph_helper(smart_contracts) + + # Step 3: Fetch committee address list + committeeAddressList = await refresh_committee_list(APP_ADMIN, apiUrl, int(time.time())) + + # Step 4: Prepare and send the response + response = { + "tokens": token_list, + "smartContracts": smart_contracts_morphed, + "smartContractCommittee": committeeAddressList + } + + if not is_backend_ready(): + response["warning"] = BACKEND_NOT_READY_WARNING + return jsonify(response), 206 + else: + return jsonify(response), 200 + + except Exception as e: + print("tokenSmartContractList:", e) + return jsonify(description=INTERNAL_ERROR), 500 + + + + + + +class ServerSentEvent: + def __init__( + self, + data: str, + *, + event: Optional[str] = None, + id: Optional[int] = None, + retry: Optional[int] = None, + ) -> None: + self.data = data + self.event = event + self.id = id + self.retry = retry + + def encode(self) -> bytes: + message = f"data: {self.data}" + if self.event is not None: + message = f"{message}\nevent: {self.event}" + if self.id is not None: + message = f"{message}\nid: {self.id}" + if self.retry is not None: + message = f"{message}\nretry: {self.retry}" + message = f"{message}\r\n\r\n" + return message.encode('utf-8') + +@app.route('/sse') +async def sse(): + queue = asyncio.Queue() + app.clients.add(queue) + + async def send_events(): + while True: + try: + data = await queue.get() + event = ServerSentEvent(data) + yield event.encode() + except asyncio.CancelledError as error: + app.clients.remove(queue) + + response = await make_response( + send_events(), + { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Transfer-Encoding': 'chunked', + }, + ) + response.timeout = None + return response + + +@app.route('/api/v2/prices', methods=['GET']) +async def priceData(): + try: + # Standardize the system database name + db_name = standardize_db_name("system") + + # Connect to the database asynchronously + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Query to fetch rate pairs and prices + query = "SELECT ratepair, price FROM ratepairs;" + await cursor.execute(query) + ratepairs = await cursor.fetchall() + + # Prepare a dictionary of prices + prices = {ratepair[0]: ratepair[1] for ratepair in ratepairs} + + # Return the prices + return jsonify(prices=prices), 200 + + except Exception as e: + print("priceData:", e) + return jsonify(description="Internal Error"), 500 + + + +####################### +####################### + +async def initialize_db(): + """ + Initializes the `ratepairs` table in the `system` database if it does not exist, + and populates it with default values. + """ + try: + # Standardize database name for the system database + db_name = standardize_db_name("system") + + # Connect to the database asynchronously + async with await get_mysql_connection(db_name, USE_ASYNC=True) as conn: + async with conn.cursor() as cursor: + # Create the `ratepairs` table if it does not exist + await cursor.execute(""" + CREATE TABLE IF NOT EXISTS ratepairs ( + id INT AUTO_INCREMENT PRIMARY KEY, + ratepair VARCHAR(20) NOT NULL UNIQUE, + price FLOAT NOT NULL + ) + """) + + # Check if the table contains any data + await cursor.execute("SELECT COUNT(*) FROM ratepairs") + count = (await cursor.fetchone())[0] + + if count == 0: + # Insert default rate pairs if the table is empty + default_ratepairs = [ + ('BTCBTC', 1), + ('BTCUSD', -1), + ('BTCINR', -1), + ('FLOUSD', -1), + ('FLOINR', -1), + ('USDINR', -1) + ] + await cursor.executemany( + "INSERT INTO ratepairs (ratepair, price) VALUES (%s, %s)", + default_ratepairs + ) + await conn.commit() + + # Update the prices + await updatePrices() + + except Exception as e: + print(f"Error initializing the database: {e}") + + + + +def set_configs(config): + global DATA_PATH, apiUrl, FLO_DATA_DIR, API_VERIFY, debug_status, APIHOST, APIPORT, APP_ADMIN, NET, is_testnet + + # Handle paths and API details + DATA_PATH = config.get("API", "dbfolder", fallback="") + apiUrl = config.get("API", "apiUrl", fallback="https://blockbook.ranchimall.net/api/") + FLO_DATA_DIR = config.get("API", "FLO_DATA_DIR", fallback="") + + # Handle API verification + API_VERIFY = config.getboolean("DEFAULT", "API_VERIFY", fallback=True) + + # Debug status and server details + debug_status = config.getboolean("API", "debug_status", fallback=False) + APIHOST = config.get("API", "HOST", fallback="localhost") + APIPORT = config.getint("API", "PORT", fallback=5432) + + # Admin and network settings + APP_ADMIN = config.get("DEFAULT", "APP_ADMIN", fallback="") + NET = config.get("DEFAULT", "NET", fallback="mainnet") + + # Determine if running on testnet + is_testnet = NET == 'testnet' + + + +# This function will run the async updatePrices within the event loop +async def run_async_update_prices(): + """Runs the async updatePrices function within the event loop.""" + await updatePrices() # Use await to run the async function + +async def shutdown(loop, signal=None): + """ + Gracefully shuts down the event loop and cancels all pending tasks. + """ + if signal: + print(f"Received exit signal {signal.name}. Shutting down...") + + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + + for task in tasks: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + loop.stop() + +def init_process(): + loop = asyncio.get_event_loop() + loop.create_task(initialize_db()) + + scheduler = BackgroundScheduler() + scheduler.add_job(lambda: asyncio.create_task(run_async_update_prices()), trigger="interval", seconds=600) + scheduler.start() + + atexit.register(lambda: asyncio.run(shutdown(loop))) + +def start_api_server(config): + set_configs(config) + init_process() + print("Starting API server at port=", APIPORT) + app.run(debug=debug_status, host=APIHOST, port=APIPORT) + +if __name__ == "__main__": + start_api_server(config)