diff --git a/tracktokens_smartcontracts.py b/tracktokens_smartcontracts.py index b0ff257..eabfb46 100755 --- a/tracktokens_smartcontracts.py +++ b/tracktokens_smartcontracts.py @@ -5,15 +5,10 @@ import logging import os import shutil import sys -import pyflo +#import pyflo import requests from sqlalchemy import create_engine, func, and_ from sqlalchemy.orm import sessionmaker -from sqlalchemy.sql import text -from sqlalchemy.exc import OperationalError, IntegrityError, ProgrammingError, SQLAlchemyError -from sqlalchemy import MetaData -from sqlalchemy import BigInteger, Column -import pymysql import time import arrow import parsing @@ -25,165 +20,16 @@ from models import SystemData, TokenBase, ActiveTable, ConsumedTable, TransferLo from statef_processing import process_stateF import asyncio import websockets -import hashlib from decimal import Decimal import pdb from util_rollback import rollback_to_block - import requests from requests.packages.urllib3.exceptions import InsecureRequestWarning # Disable the InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) -#ENHANCEMENTS START -import glob - -def add_block_hashrecord(block_number, block_hash): - """ - Adds a new block entry to the RecentBlocks table if it does not already exist. - Maintains only the latest 1000 blocks by removing the oldest entry if necessary. - - :param block_number: The block number of the new block. - :param block_hash: The hash of the new block. - """ - while True: - try: - conn = create_database_connection('latest_cache', {'db_name': "latestCache"}) - break - except Exception as e: - logger.error(f"Error connecting to database: {e}") - time.sleep(DB_RETRY_TIMEOUT) - - try: - # Check if the block already exists - existing_block = conn.execute( - 'SELECT * FROM RecentBlocks WHERE blockNumber = %s', (block_number,) - ).fetchone() - if existing_block: - logger.info(f"Block {block_number} already exists. No action taken.") - return - - # Add the new block entry - conn.execute( - 'INSERT INTO RecentBlocks (blockNumber, blockHash) VALUES (%s, %s)', - (block_number, block_hash) - ) - logger.info(f"Added hash of block {block_number} with hash {block_hash} to detect reorganization of chain.") - - # Check the count of blocks - block_count = conn.execute('SELECT COUNT(*) FROM RecentBlocks').fetchone()[0] - - # If more than 1000 blocks, delete the oldest entries - if block_count > 1000: - # Determine how many blocks to remove - excess_count = block_count - 1000 - oldest_blocks = conn.execute( - 'SELECT id FROM RecentBlocks ORDER BY id ASC LIMIT %s', - (excess_count,) - ).fetchall() - for block in oldest_blocks: - conn.execute('DELETE FROM RecentBlocks WHERE id = %s', (block[0],)) - logger.info( - f"Deleted {excess_count} oldest block(s) to maintain the limit of 1000 blocks." - ) - - except Exception as e: - logger.error(f"Database error: {e}") - finally: - conn.close() - - -def detect_reorg(): - """ - Detects a blockchain reorganization by finding a potential fork point. - Returns the fork point block number if a fork is detected, otherwise returns None. - - :return: Block number of the fork point, or None if no fork is detected. - """ - # Constants - BLOCKBOOK_API_URL = 'https://blockbook.ranchimall.net/' - API_VERIFY = True - ROLLBACK_BUFFER = 2 # Number of blocks before the fork point for rollback - - try: - # Connect to the system database to get the last scanned block - logger.info("Connecting to the system database to fetch the last scanned block...") - conn = create_database_connection('system_dbs', {'db_name': 'system'}) - result = conn.execute("SELECT value FROM systemData WHERE attribute = %s", ('lastblockscanned',)) - row = result.fetchone() - conn.close() - - if row is None: - logger.error("No last scanned block found in the system database. Exiting detection.") - return None - - try: - latest_block = int(row[0]) - logger.info(f"Last scanned block retrieved: {latest_block}") - except ValueError: - logger.error("Invalid block number in the system database. Exiting detection.") - return None - - block_number = latest_block - while block_number > 0: - logger.info(f"Fetching local block hash for block number: {block_number}") - conn = create_database_connection('latest_cache', {'db_name': 'latestCache'}) - result = conn.execute("SELECT blockHash FROM RecentBlocks WHERE blockNumber = %s", (block_number,)) - local_row = result.fetchone() - conn.close() - - if local_row is None: - logger.error(f"No data found for block {block_number} in recentBlocks. Exiting detection.") - return None # No block data available locally - - local_block_hash = local_row[0] - logger.info(f"Local block hash for block {block_number}: {local_block_hash}") - - # Fetch the block from Blockbook API - try: - logger.info(f"Fetching block hash from Blockbook API for block number: {block_number}") - api_url = f'{BLOCKBOOK_API_URL}api/block/{block_number}' - logger.info(f"API Request URL: {api_url}") - - response = requests.get(api_url, verify=API_VERIFY, timeout=RETRY_TIMEOUT_SHORT) - response.raise_for_status() # Raise HTTP errors - response_json = response.json() - - if "hash" in response_json: - blockbook_hash = response_json["hash"] - logger.info(f"Blockbook hash for block {block_number}: {blockbook_hash}") - else: - logger.error(f"Missing 'hash' key in Blockbook API response: {response_json}") - return None - - # Check if the local block matches Blockbook's hash - if local_block_hash == blockbook_hash: - logger.info(f"Block {block_number} matches between local and Blockbook. No reorg detected.") - return None # No reorg detected - else: - logger.warning(f"Reorg detected at block {block_number}. Local hash: {local_block_hash}, Blockbook hash: {blockbook_hash}") - return block_number - ROLLBACK_BUFFER - - except requests.RequestException as e: - logger.error(f"Error connecting to Blockbook API: {e}") - time.sleep(2) - continue - - except Exception as e: - logger.error(f"Unexpected error during Blockbook API call: {e}") - return None - - except Exception as e: - logger.error(f"Unexpected error during reorg detection: {e}") - return None - - - -#ENHANCEMENTS END - - RETRY_TIMEOUT_LONG = 30 * 60 # 30 mins @@ -192,36 +38,22 @@ DB_RETRY_TIMEOUT = 60 # 60 seconds def newMultiRequest(apicall): - current_server = serverlist[0] # Start with the first server - retry_count = 0 - - while True: # Infinite loop + current_server = serverlist[0] + while True: try: - # Add a timeout of 10 seconds - response = requests.get(f"{current_server}api/v1/{apicall}", verify=API_VERIFY, timeout=RETRY_TIMEOUT_SHORT) + response = requests.get(f"{current_server}api/v1/{apicall}", verify=API_VERIFY) logger.info(f"Called the API {current_server}api/v1/{apicall}") if response.status_code == 200: - try: - return response.json() # Attempt to parse the JSON response - except ValueError as e: - logger.error(f"Failed to parse JSON response: {e}") - raise ValueError("Invalid JSON response received.") + return response.json() # Use the built-in .json() method else: - logger.warning(f"Non-200 status code received: {response.status_code}") - logger.warning(f"Response content: {response.content}") - raise requests.exceptions.HTTPError(f"HTTP {response.status_code}") - - except requests.exceptions.RequestException as e: - logger.error(f"Request exception: {e}. Switching server...") + logger.info(f"Response status code - \n{response.status_code}") + logger.info(f"Response content -\n{response.content}") + raise Exception("Non-200 status code") except Exception as e: - logger.error(f"Unhandled exception: {e}. Switching server...") - - # Switch to the next server - current_server = switchNeturl(current_server) - retry_count += 1 - logger.info(f"Switched to {current_server}. Retrying... Attempt #{retry_count}") - time.sleep(2) # Wait before retrying - + logger.info(f"newMultiRequest() exception: {e}. Switching server...") + current_server = switchNeturl(current_server) + logger.info(f"newMultiRequest() switched to {current_server}") + time.sleep(2) def pushData_SSEapi(message): @@ -261,7 +93,7 @@ def refresh_committee_list_old(admin_flo_id, api_url, blocktime): if response.status_code == 200: response = response.json() else: - logger.info('Response from the Blockbook API failed') + logger.info('Response from the Flosight API failed') sys.exit(0) committee_list = [] @@ -302,11 +134,11 @@ def refresh_committee_list(admin_flo_id, api_url, blocktime): if response.status_code == 200: return response.json() else: - logger.info(f'Response from the Blockbook API failed. Retry in {RETRY_TIMEOUT_SHORT}s') + logger.info(f'Response from the Flosight API failed. Retry in {RETRY_TIMEOUT_SHORT}s') #sys.exit(0) time.sleep(RETRY_TIMEOUT_SHORT) except: - logger.info(f'Fetch from the Blockbook API failed. Retry in {RETRY_TIMEOUT_LONG}s...') + logger.info(f'Fetch from the Flosight API failed. Retry in {RETRY_TIMEOUT_LONG}s...') time.sleep(RETRY_TIMEOUT_LONG) url = f'{api_url}api/v1/address/{admin_flo_id}?details=txs' @@ -382,162 +214,59 @@ def find_sender_receiver(transaction_data): def check_database_existence(type, parameters): - """ - Checks the existence of a MySQL database by attempting to connect to it. - - Args: - type (str): Type of the database ('token', 'smart_contract'). - parameters (dict): Parameters for constructing database names. - - Returns: - bool: True if the database exists, False otherwise. - """ - - # Construct database name and URL if type == 'token': - database_name = f"{mysql_config.database_prefix}_{parameters['token_name']}_db" - elif type == 'smart_contract': - database_name = f"{mysql_config.database_prefix}_{parameters['contract_name']}_{parameters['contract_address']}_db" - else: - raise ValueError(f"Unsupported database type: {type}") - - # Create a temporary engine to check database existence - engine_url = f"mysql+pymysql://{mysql_config.username}:{mysql_config.password}@{mysql_config.host}/{database_name}" - try: - engine = create_engine(engine_url, echo=False) - connection = engine.connect() - connection.close() - return True - except OperationalError: - return False + path = os.path.join(config['DEFAULT']['DATA_PATH'], 'tokens', f'{parameters["token_name"]}.db') + return os.path.isfile(path) + + if type == 'smart_contract': + path = os.path.join(config['DEFAULT']['DATA_PATH'], 'smartContracts', f"{parameters['contract_name']}-{parameters['contract_address']}.db") + return os.path.isfile(path) def create_database_connection(type, parameters=None): - """ - Creates a database connection using MySQL credentials from the config file. + if type == 'token': + path = os.path.join(config['DEFAULT']['DATA_PATH'], 'tokens', f"{parameters['token_name']}.db") + engine = create_engine(f"sqlite:///{path}", echo=True) + elif type == 'smart_contract': + path = os.path.join(config['DEFAULT']['DATA_PATH'], 'smartContracts', f"{parameters['contract_name']}-{parameters['contract_address']}.db") + engine = create_engine(f"sqlite:///{path}", echo=True) + elif type == 'system_dbs': + path = os.path.join(config['DEFAULT']['DATA_PATH'], f"system.db") + engine = create_engine(f"sqlite:///{path}", echo=False) + elif type == 'latest_cache': + path = os.path.join(config['DEFAULT']['DATA_PATH'], f"latestCache.db") + engine = create_engine(f"sqlite:///{path}", echo=False) - Args: - type (str): Type of the database ('token', 'smart_contract', 'system_dbs', 'latest_cache'). - parameters (dict, optional): Parameters for dynamic database names. + connection = engine.connect() + return connection - Returns: - connection: SQLAlchemy connection object. - """ - - # Map database type to naming logic - database_mapping = { - 'token': lambda: f"{mysql_config.database_prefix}_{parameters['token_name']}_db", - 'smart_contract': lambda: f"{mysql_config.database_prefix}_{parameters['contract_name']}_{parameters['contract_address']}_db", - 'system_dbs': lambda: f"{mysql_config.database_prefix}_system_db", - 'latest_cache': lambda: f"{mysql_config.database_prefix}_latestCache_db" - } - - # Validate and construct the database name - if type not in database_mapping: - raise ValueError(f"Unknown database type: {type}") - database_name = database_mapping[type]() - - # Create the database engine - echo_setting = True if type in ['token', 'smart_contract'] else False - engine = create_engine(f"mysql+pymysql://{mysql_config.username}:{mysql_config.password}@{mysql_config.host}/{database_name}", echo=echo_setting) - - # Connect to the database - return engine.connect() - - - -from sqlalchemy.exc import SQLAlchemyError def create_database_session_orm(type, parameters, base): - """ - Creates a SQLAlchemy session for the specified database type, ensuring the database exists. - - Args: - type (str): Type of the database ('token', 'smart_contract', 'system_dbs'). - parameters (dict): Parameters for constructing database names. - base: SQLAlchemy declarative base for the ORM models. - - Returns: - session: SQLAlchemy session object. - """ - try: - # Construct database name based on type - if type == 'token': - database_name = f"{mysql_config.database_prefix}_{parameters['token_name']}_db" - elif type == 'smart_contract': - database_name = f"{mysql_config.database_prefix}_{parameters['contract_name']}_{parameters['contract_address']}_db" - elif type == 'system_dbs': - database_name = f"{mysql_config.database_prefix}_{parameters['db_name']}_db" - else: - raise ValueError(f"Unknown database type: {type}") - - #logger.info(f"Database name constructed: {database_name}") - - # Check if the database exists using information_schema - server_engine = create_engine( - f"mysql+pymysql://{mysql_config.username}:{mysql_config.password}@{mysql_config.host}/", - connect_args={"connect_timeout": DB_RETRY_TIMEOUT}, # 10 seconds timeout for connection - echo=False - ) - with server_engine.connect() as connection: - #logger.info(f"Checking existence of database '{database_name}'...") - db_exists = connection.execute( - "SELECT COUNT(*) FROM information_schema.schemata WHERE schema_name = %s", - (database_name,) - ).scalar() - - if not db_exists: - logger.info(f"Database '{database_name}' does not exist. Creating it...") - connection.execute(f"CREATE DATABASE `{database_name}`") - logger.info(f"Database '{database_name}' created successfully.") - else: - logger.info(f"Database '{database_name}' already exists.") - - # Connect to the specific database and initialize tables - logger.info(f"Connecting to database '{database_name}'...") - engine = create_engine( - f"mysql+pymysql://{mysql_config.username}:{mysql_config.password}@{mysql_config.host}/{database_name}", - connect_args={"connect_timeout": DB_RETRY_TIMEOUT}, - echo=False - ) - base.metadata.create_all(bind=engine) # Create tables if they do not exist + if type == 'token': + path = os.path.join(config['DEFAULT']['DATA_PATH'], 'tokens', f"{parameters['token_name']}.db") + engine = create_engine(f"sqlite:///{path}", echo=True) + base.metadata.create_all(bind=engine) session = sessionmaker(bind=engine)() - logger.info(f"Session created for database '{database_name}' successfully.") - return session - - except SQLAlchemyError as e: - logger.error(f"SQLAlchemy error occurred: {e}") - raise - except Exception as e: - logger.error(f"Unexpected error occurred: {e}") - raise - + elif type == 'smart_contract': + path = os.path.join(config['DEFAULT']['DATA_PATH'], 'smartContracts', f"{parameters['contract_name']}-{parameters['contract_address']}.db") + engine = create_engine(f"sqlite:///{path}", echo=True) + base.metadata.create_all(bind=engine) + session = sessionmaker(bind=engine)() + elif type == 'system_dbs': + path = os.path.join(config['DEFAULT']['DATA_PATH'], f"{parameters['db_name']}.db") + engine = create_engine(f"sqlite:///{path}", echo=False) + base.metadata.create_all(bind=engine) + session = sessionmaker(bind=engine)() + + return session def delete_contract_database(parameters): - """ - Deletes a MySQL database for a smart contract if it exists. - - Args: - parameters (dict): Parameters for constructing the database name. - Example: {'contract_name': 'example_contract', 'contract_address': '0x123abc'} - """ - - # Construct the database name - database_name = f"{mysql_config.database_prefix}_{parameters['contract_name']}_{parameters['contract_address']}_db" - - # Check if the database exists - if check_database_existence('smart_contract', parameters): - # Connect to MySQL server (without specifying a database) - engine = create_engine(f"mysql+pymysql://{mysql_config.username}:{mysql_config.password}@{mysql_config.host}/", echo=False) - with engine.connect() as connection: - # Drop the database - connection.execute(f"DROP DATABASE `{database_name}`") - logger.info(f"Database '{database_name}' has been deleted.") - else: - logger.info(f"Database '{database_name}' does not exist.") + if check_database_existence('smart_contract', {'contract_name':f"{parameters['contract_name']}", 'contract_address':f"{parameters['contract_address']}"}): + path = os.path.join(config['DEFAULT']['DATA_PATH'], 'smartContracts', f"{parameters['contract_name']}-{parameters['contract_address']}.db") + os.remove(path) def add_transaction_history(token_name, sourceFloAddress, destFloAddress, transferAmount, blockNumber, blockHash, blocktime, transactionHash, jsonData, transactionType, parsedFloData): @@ -594,7 +323,7 @@ def add_contract_transaction_history(contract_name, contract_address, transactio def rejected_transaction_history(transaction_data, parsed_data, sourceFloAddress, destFloAddress, rejectComment): while True: try: - session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) + session = create_database_session_orm('system_dbs', {'db_name': "system"}, TokenBase) blockchainReference = neturl + 'tx/' + transaction_data['txid'] session.add(RejectedTransactionHistory(tokenIdentification=parsed_data['tokenIdentification'], sourceFloAddress=sourceFloAddress, destFloAddress=destFloAddress, @@ -776,132 +505,70 @@ def fetchDynamicSwapPrice(contractStructure, blockinfo): return float(contractStructure['price']) -def processBlock(blockindex=None, blockhash=None, blockinfo=None, keywords=None): - """ - Processes a block with optional keyword filtering. - - :param blockindex: The block index (height) to process. - :param blockhash: The block hash to process. - :param blockinfo: The block data to process. If not provided, it will be fetched. - :param keywords: List of keywords to filter transactions. If None, processes all transactions. - """ - global args - while True: # Loop to handle rollbacks - # Retrieve block information if not already provided - if blockinfo is None: - if blockindex is not None and blockhash is None: - logger.info(f'Processing block {blockindex}') - while blockhash is None or blockhash == '': - response = newMultiRequest(f"block-index/{blockindex}") - try: - blockhash = response['blockHash'] - except: - logger.info(f"API call block-index/{blockindex} failed to give proper response. Retrying.") - - blockinfo = newMultiRequest(f"block/{blockhash}") - - # Filter based on keywords if provided - if keywords: - should_process = any( - any(keyword.lower() in transaction_data.get("floData", "").lower() for keyword in keywords) - for transaction_data in blockinfo.get('txs', []) - ) - if not should_process: - logger.info(f"Block {blockindex} does not contain relevant keywords. Skipping processing.") - break - - # Add block to the database (this shouldn't prevent further processing) - block_already_exists = False - try: - block_already_exists = add_block_hashrecord(blockinfo["height"], blockinfo["hash"]) - except Exception as e: - logger.error(f"Error adding block {blockinfo['height']} to the database: {e}") - - # Ensure processing continues even if the block exists - if block_already_exists: - logger.info(f"Block {blockindex} already exists but will continue processing its transactions.") - - # Detect reorg every 10 blocks - if not args.rebuild and blockindex is not None and blockindex % 10 == 0: - fork_point = detect_reorg() - if fork_point is not None: - logger.warning(f"Blockchain reorganization detected! Fork point at block {fork_point}.") - - # Handle rollback - rollback_to_block(fork_point) - - # Restart processing from fork point - blockindex = fork_point + 1 - blockhash = None - blockinfo = None - - # Fetch new blockhash and blockinfo for the updated blockindex - while blockhash is None or blockhash == '': - response = newMultiRequest(f"block-index/{blockindex}") - try: - blockhash = response['blockHash'] - except: - logger.info(f"API call block-index/{blockindex} failed to give proper response. Retrying.") - - # Fetch blockinfo for the new blockhash - blockinfo = newMultiRequest(f"block/{blockhash}") - - logger.info(f"Rollback complete. Restarting processing from block {blockindex} with hash {blockhash}.") - continue # Restart loop from updated fork point - - # Perform expiry and deposit trigger checks - checkLocal_time_expiry_trigger_deposit(blockinfo) - - # Process transactions in the block - acceptedTxList = [] - logger.info("Before tx loop") - - for transaction_data in blockinfo["txs"]: - transaction = transaction_data["txid"] - - try: - text = transaction_data["floData"] +def processBlock(blockindex=None, blockhash=None): + if blockindex is not None and blockhash is None: + logger.info(f'Processing block {blockindex}') + # Get block details + while blockhash is None or blockhash == '': + response = newMultiRequest(f"block-index/{blockindex}") + try: + blockhash = response['blockHash'] except: - text = '' - text = text.replace("\n", " \n ") - returnval = None - parsed_data = parsing.parse_flodata(text, blockinfo, config['DEFAULT']['NET']) - if parsed_data['type'] not in ['noise', None, '']: - logger.info(f"Processing transaction {transaction}") - logger.info(f"flodata {text} is parsed to {parsed_data}") - returnval = processTransaction(transaction_data, parsed_data, blockinfo) + logger.info(f"API call block-index/{blockindex} failed to give proper response. Retrying.") - if returnval == 1: - acceptedTxList.append(transaction) - elif returnval == 0: - logger.info(f"Transfer for the transaction {transaction} is illegitimate. Moving on") + blockinfo = newMultiRequest(f"block/{blockhash}") - logger.info("Completed tx loop") + #TODO: Check for reorg in here - if len(acceptedTxList) > 0: - tempinfo = blockinfo['txs'].copy() - for tx in blockinfo['txs']: - if tx['txid'] not in acceptedTxList: - tempinfo.remove(tx) - blockinfo['txs'] = tempinfo + # Check and perform operations which do not require blockchain intervention + checkLocal_expiry_trigger_deposit(blockinfo) - try: - updateLatestBlock(blockinfo) # Core logic to update - logger.info(f"Successfully updated latest block: {blockinfo['height']}") - except Exception as e: - logger.error(f"Error updating latest block {blockinfo['height']} in updateLatestBlock: {e}") + # todo Rule 8 - read every transaction from every block to find and parse flodata + counter = 0 + acceptedTxList = [] + # Scan every transaction + logger.info("Before tx loop") + for transaction_data in blockinfo["txs"]: + transaction = transaction_data["txid"] + try: - session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) - entry = session.query(SystemData).filter(SystemData.attribute == 'lastblockscanned').all()[0] - entry.value = str(blockinfo['height']) - session.commit() - session.close() - except Exception as e: - logger.error(f"Error connecting to 'system' database: {e}. Retrying in {DB_RETRY_TIMEOUT} seconds") - time.sleep(DB_RETRY_TIMEOUT) + text = transaction_data["floData"] + except: + text = '' + text = text.replace("\n", " \n ") + # todo Rule 9 - Reject all noise transactions. Further rules are in parsing.py + returnval = None + parsed_data = parsing.parse_flodata(text, blockinfo, config['DEFAULT']['NET']) + if parsed_data['type'] not in ['noise', None, '']: + logger.info(f"Processing transaction {transaction}") + logger.info(f"flodata {text} is parsed to {parsed_data}") + returnval = processTransaction(transaction_data, parsed_data, blockinfo) - break # Exit loop after processing is complete without a rollback + if returnval == 1: + acceptedTxList.append(transaction) + elif returnval == 0: + logger.info("Transfer for the transaction %s is illegitimate. Moving on" % transaction) + + logger.info("Completed tx loop") + + if len(acceptedTxList) > 0: + tempinfo = blockinfo['txs'].copy() + for tx in blockinfo['txs']: + if tx['txid'] not in acceptedTxList: + tempinfo.remove(tx) + blockinfo['txs'] = tempinfo + updateLatestBlock(blockinfo) + + try: + session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) + entry = session.query(SystemData).filter(SystemData.attribute == 'lastblockscanned').all()[0] + entry.value = str(blockinfo['height']) + session.commit() + session.close() + except: + logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds") + time.sleep(DB_RETRY_TIMEOUT) def updateLatestTransaction(transactionData, parsed_data, db_reference, transactionType=None ): @@ -914,13 +581,9 @@ def updateLatestTransaction(transactionData, parsed_data, db_reference, transact time.sleep(DB_RETRY_TIMEOUT) if transactionType is None: transactionType = parsed_data['type'] - try: - conn.execute("INSERT INTO latestTransactions (transactionHash, blockNumber, jsonData, transactionType, parsedFloData, db_reference) VALUES (%s, %s, %s, %s, %s, %s)", (transactionData['txid'], transactionData['blockheight'], json.dumps(transactionData), transactionType, json.dumps(parsed_data), db_reference)) - except Exception as e: - logger.error(f"Error inserting into latestTransactions: {e}") - finally: - conn.close() - + conn.execute("INSERT INTO latestTransactions(transactionHash, blockNumber, jsonData, transactionType, parsedFloData, db_reference) VALUES (?,?,?,?,?,?)", (transactionData['txid'], transactionData['blockheight'], json.dumps(transactionData), transactionType, json.dumps(parsed_data), db_reference)) + #conn.commit() + conn.close() def updateLatestBlock(blockData): @@ -931,7 +594,7 @@ def updateLatestBlock(blockData): break except: time.sleep(DB_RETRY_TIMEOUT) - conn.execute('INSERT INTO latestBlocks(blockNumber, blockHash, jsonData) VALUES (%s, %s, %s)',(blockData['height'], blockData['hash'], json.dumps(blockData))) + conn.execute('INSERT INTO latestBlocks(blockNumber, blockHash, jsonData) VALUES (?,?,?)', (blockData['height'], blockData['hash'], json.dumps(blockData))) #conn.commit() conn.close() @@ -987,7 +650,6 @@ def transferToken(tokenIdentification, tokenAmount, inputAddress, outputAddress, query_data = session.query(ActiveTable.transferBalance).filter_by(address=inputAddress).all() availableTokens = float(sum(Decimal(f"{amount[0]}") if amount[0] is not None else Decimal(0) for amount in query_data)) - logger.info(f"The sender address {inputAddress} owns {availableTokens} {tokenIdentification.upper()} tokens") commentTransferAmount = float(tokenAmount) if availableTokens is None: @@ -996,12 +658,11 @@ def transferToken(tokenIdentification, tokenAmount, inputAddress, outputAddress, return 0 elif availableTokens < commentTransferAmount: - logger.info("The transfer amount is more than the user balance\nThis transaction will be discarded\n") + logger.info("The transfer amount passed in the comments is more than the user owns\nThis transaction will be discarded\n") session.close() return 0 elif availableTokens >= commentTransferAmount: - logger.info(f"System has accepted transfer of {commentTransferAmount} {tokenIdentification.upper()}# from {inputAddress} to {outputAddress}") table = session.query(ActiveTable).filter(ActiveTable.address == inputAddress).all() pidlst = [] checksum = 0 @@ -1120,7 +781,7 @@ def transferToken(tokenIdentification, tokenAmount, inputAddress, outputAddress, return 1 -def trigger_internal_contract_onvalue(tokenAmount_sum, contractStructure, transaction_data, blockinfo, parsed_data, connection, contract_name, contract_address, transaction_subType): +def trigger_internal_contract(tokenAmount_sum, contractStructure, transaction_data, blockinfo, parsed_data, connection, contract_name, contract_address, transaction_subType): # Trigger the contract if tokenAmount_sum <= 0: # Add transaction to ContractTransactionHistory @@ -1181,7 +842,7 @@ def process_maximum_subscriptionamount(contractStructure, connection, status, bl if tokenAmount_sum >= maximumsubscriptionamount: # Trigger the contract if status == 'close': - success_returnval = trigger_internal_contract_onvalue(tokenAmount_sum, contractStructure, transaction_data, blockinfo, parsed_data, connection, contract_name=contractStructure['contractName'], contract_address=contractStructure['contractAddress'], transaction_subType='maximumsubscriptionamount') + success_returnval = trigger_internal_contract(tokenAmount_sum, contractStructure, transaction_data, blockinfo, parsed_data, connection, contract_name=contractStructure['contractName'], contract_address=contractStructure['contractAddress'], transaction_subType='maximumsubscriptionamount') if not success_returnval: return 0 return 1 @@ -1192,7 +853,7 @@ def process_maximum_subscriptionamount(contractStructure, connection, status, bl def check_contract_status(contractName, contractAddress): # Status of the contract is at 2 tables in system.db # activecontracts and time_actions - # select the last entry from the column + # select the last entry form the colum while True: try: connection = create_database_connection('system_dbs') @@ -1210,39 +871,37 @@ def close_expire_contract(contractStructure, contractStatus, transactionHash, bl break except: time.sleep(DB_RETRY_TIMEOUT) - connection.execute("INSERT INTO activecontracts VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", (None, contractStructure['contractName'], contractStructure['contractAddress'], contractStatus, contractStructure['tokenIdentification'], contractStructure['contractType'], transactionHash, blockNumber, blockHash, incorporationDate, expiryDate, closeDate)) - connection.execute("INSERT INTO time_actions VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", (None, trigger_time, trigger_activity, contractStatus, contractName, contractAddress, contractType, tokens_db, parsed_data, transactionHash, blockHeight)) + connection.execute('INSERT INTO activecontracts VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (None, contractStructure['contractName'], contractStructure['contractAddress'], contractStatus, contractStructure['tokenIdentification'], contractStructure['contractType'], transactionHash, blockNumber, blockHash, incorporationDate, expiryDate, closeDate)) + connection.execute('INSERT INTO time_actions VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (None, trigger_time, trigger_activity, contractStatus, contractName, contractAddress, contractType, tokens_db, parsed_data, transactionHash, blockHeight)) connection.close() -def return_time_active_contracts(session, status='active', activity='contract-time-trigger'): - sql_query = text(""" - SELECT t1.* - FROM time_actions t1 - JOIN ( - SELECT contractName, contractAddress, MAX(id) AS max_id - FROM time_actions - GROUP BY contractName, contractAddress - ) t2 - ON t1.contractName = t2.contractName - AND t1.contractAddress = t2.contractAddress - AND t1.id = t2.max_id - WHERE t1.status = :status AND t1.activity = :activity - """) - active_contracts = session.execute(sql_query, {'status': status, 'activity': activity}).fetchall() +def return_active_contracts(session): + active_contracts = session.execute('''SELECT t1.* FROM time_actions t1 JOIN ( SELECT contractName, contractAddress, MAX(id) AS max_id FROM time_actions GROUP BY contractName, contractAddress ) t2 ON t1.contractName = t2.contractName AND t1.contractAddress = t2.contractAddress AND t1.id = t2.max_id WHERE t1.status = 'active' AND t1.activity = 'contract-time-trigger' ''').all() return active_contracts - -def return_time_active_deposits(session): +def return_active_deposits(session): # find all the deposits which are active # todo - sqlalchemy gives me warning with the following method - subquery_filter = session.query(TimeActions.id).group_by(TimeActions.transactionHash,TimeActions.id).having(func.count(TimeActions.transactionHash)==1).subquery() + subquery_filter = session.query(TimeActions.id).group_by(TimeActions.transactionHash).having(func.count(TimeActions.transactionHash)==1).subquery() active_deposits = session.query(TimeActions).filter(TimeActions.id.in_(subquery_filter.select()), TimeActions.status=='active', TimeActions.activity=='contract-deposit').all() return active_deposits -def process_contract_time_trigger(blockinfo, systemdb_session, active_contracts): +def checkLocal_expiry_trigger_deposit(blockinfo): + # Connect to system.db with a session + while True: + try: + systemdb_session = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase) + + break + except: + time.sleep(DB_RETRY_TIMEOUT) + timeactions_tx_hashes = [] + active_contracts = return_active_contracts(systemdb_session) + active_deposits = return_active_deposits(systemdb_session) + for query in active_contracts: query_time = convert_datetime_to_arrowobject(query.time) blocktime = parsing.arrow.get(blockinfo['time']).to('Asia/Kolkata') @@ -1255,25 +914,23 @@ def process_contract_time_trigger(blockinfo, systemdb_session, active_contracts) tx_type = 'trigger' data = [blockinfo['hash'], blockinfo['height'], blockinfo['time'], blockinfo['size'], tx_type] - def _get_txid(data): - """ - Generate a SHA256 hash of the input data. - :param data: The data to be hashed. - :return: The SHA256 hash as a hexadecimal string. - """ - try: - # Ensure data is encoded before hashing - if isinstance(data, str): - data = data.encode('utf-8') - txid = hashlib.sha256(data).hexdigest() - return txid - except Exception as e: - logger.error(f"Failed to generate SHA256 hash: {e}") - raise e + def _get_txid(): + while True: + try: + response = requests.get(f'https://stdops.ranchimall.net/hash?data={data}', verify=API_VERIFY) + if response.status_code == 200: + txid = response.json() + return txid + elif response.status_code == 404: + logger.info(f'Internal trigger has failed (404) for getting txid from stdops.ranchimall.net. Retry in {RETRY_TIMEOUT_LONG}s') + time.sleep(RETRY_TIMEOUT_LONG) + except: + logger.info(f'Internal trigger has failed for getting txid from stdops.ranchimall.net. Retry in {RETRY_TIMEOUT_LONG}s') + time.sleep(RETRY_TIMEOUT_LONG) transaction_data = {} - transaction_data['txid'] = _get_txid(data) + transaction_data['txid'] = _get_txid() transaction_data['blockheight'] = blockinfo['height'] transaction_data['time'] = blockinfo['time'] @@ -1292,18 +949,15 @@ def process_contract_time_trigger(blockinfo, systemdb_session, active_contracts) tokenAmount_sum = float(sum(Decimal(f"{row[0]}") for row in rows)) if tokenAmount_sum >= maximumsubscriptionamount: # Expire the contract - logger.info(f"Maximum Subscription amount {maximumsubscriptionamount} reached for {query.contractName}-{query.contractAddress}. Expiring the contract") close_expire_contract(contractStructure, 'expired', transaction_data['txid'], blockinfo['height'], blockinfo['hash'], activecontracts_table_info.incorporationDate, blockinfo['time'], None, query.time, query.activity, query.contractName, query.contractAddress, query.contractType, query.tokens_db, query.parsed_data, blockinfo['height']) if blocktime > query_time: if 'minimumsubscriptionamount' in contractStructure: if process_minimum_subscriptionamount(contractStructure, connection, blockinfo, transaction_data, parsed_data): - logger.info(f"Contract trigger time {query_time} achieved and Minimimum subscription amount reached for {query.contractName}-{query.contractAddress}. Closing the contract") close_expire_contract(contractStructure, 'closed', transaction_data['txid'], blockinfo['height'], blockinfo['hash'], activecontracts_table_info.incorporationDate, blockinfo['time'], blockinfo['time'], query.time, query.activity, query.contractName, query.contractAddress, query.contractType, query.tokens_db, query.parsed_data, blockinfo['height']) return # Expire the contract - logger.info(f"Contract trigger time {query_time} achieved for {query.contractName}-{query.contractAddress}. Expiring the contract") close_expire_contract(contractStructure, 'expired', transaction_data['txid'], blockinfo['height'], blockinfo['hash'], activecontracts_table_info.incorporationDate, blockinfo['time'], None, query.time, query.activity, query.contractName, query.contractAddress, query.contractType, query.tokens_db, query.parsed_data, blockinfo['height']) elif 'payeeAddress' in contractStructure: # Internal trigger contract type @@ -1315,18 +969,16 @@ def process_contract_time_trigger(blockinfo, systemdb_session, active_contracts) tokenAmount_sum = float(sum(Decimal(f"{row[0]}") for row in rows)) if tokenAmount_sum >= maximumsubscriptionamount: # Trigger the contract - logger.info(f"Triggering the {query.contractName}-{query.contractAddress} as maximum subscription amount {maximumsubscriptionamount} has been reached ") - success_returnval = trigger_internal_contract_onvalue(tokenAmount_sum, contractStructure, transaction_data, blockinfo, parsed_data, connection, contract_name=query.contractName, contract_address=query.contractAddress, transaction_subType='maximumsubscriptionamount') + success_returnval = trigger_internal_contract(tokenAmount_sum, contractStructure, transaction_data, blockinfo, parsed_data, connection, contract_name=query.contractName, contract_address=query.contractAddress, transaction_subType='maximumsubscriptionamount') if not success_returnval: return 0 - logger.info(f"Closing the {query.contractName}-{query.contractAddress} as maximum subscription amount {maximumsubscriptionamount} has been reached ") + close_expire_contract(contractStructure, 'closed', transaction_data['txid'], blockinfo['height'], blockinfo['hash'], activecontracts_table_info.incorporationDate, blockinfo['time'], blockinfo['time'], query.time, query.activity, query.contractName, query.contractAddress, query.contractType, query.tokens_db, query.parsed_data, blockinfo['height']) return if blocktime > query_time: if 'minimumsubscriptionamount' in contractStructure: if process_minimum_subscriptionamount(contractStructure, connection, blockinfo, transaction_data, parsed_data): - logger.info(f"Contract trigger time {query_time} achieved and Minimimum subscription amount reached for {query.contractName}-{query.contractAddress}. Closing the contract") close_expire_contract(contractStructure, 'closed', transaction_data['txid'], blockinfo['height'], blockinfo['hash'], activecontracts_table_info.incorporationDate, blockinfo['time'], blockinfo['time'], query.time, query.activity, query.contractName, query.contractAddress, query.contractType, query.tokens_db, query.parsed_data, blockinfo['height']) return @@ -1334,17 +986,13 @@ def process_contract_time_trigger(blockinfo, systemdb_session, active_contracts) rows = connection.execute('SELECT tokenAmount FROM contractparticipants').fetchall() # Sum up using Decimal tokenAmount_sum = float(sum(Decimal(f"{row[0]}") for row in rows)) - logger.info(f"Triggering the contract {query.contractName}-{query.contractAddress}") - success_returnval = trigger_internal_contract_onvalue(tokenAmount_sum, contractStructure, transaction_data, blockinfo, parsed_data, connection, contract_name=query.contractName, contract_address=query.contractAddress, transaction_subType='expiryTime') + success_returnval = trigger_internal_contract(tokenAmount_sum, contractStructure, transaction_data, blockinfo, parsed_data, connection, contract_name=query.contractName, contract_address=query.contractAddress, transaction_subType='expiryTime') if not success_returnval: return 0 - logger.info(f"Closing the contract {query.contractName}-{query.contractAddress}") close_expire_contract(contractStructure, 'closed', transaction_data['txid'], blockinfo['height'], blockinfo['hash'], activecontracts_table_info.incorporationDate, blockinfo['time'], blockinfo['time'], query.time, query.activity, query.contractName, query.contractAddress, query.contractType, query.tokens_db, query.parsed_data, blockinfo['height']) return - -def process_contract_deposit_trigger(blockinfo, systemdb_session, active_deposits): for query in active_deposits: query_time = convert_datetime_to_arrowobject(query.time) blocktime = parsing.arrow.get(blockinfo['time']).to('Asia/Kolkata') @@ -1371,7 +1019,6 @@ def process_contract_deposit_trigger(blockinfo, systemdb_session, active_deposit transaction_data['txid'] = query.transactionHash transaction_data['blockheight'] = blockinfo['height'] transaction_data['time'] = blockinfo['time'] - logger.info(f"Initiating smartContractDepositReturn after time expiry {query_time} for {depositorAddress} with amount {returnAmount} {sellingToken}# from {query.contractName}-{query.contractAddress} contract ") returnval = transferToken(sellingToken, returnAmount, query.contractAddress, depositorAddress, transaction_data=transaction_data, parsed_data=parsed_data, blockinfo=blockinfo) if returnval == 0: logger.critical("Something went wrong in the token transfer method while return contract deposit. THIS IS CRITICAL ERROR") @@ -1388,7 +1035,7 @@ def process_contract_deposit_trigger(blockinfo, systemdb_session, active_deposit blockNumber = blockinfo['height'], blockHash = blockinfo['hash'] )) - logger.info(f"Successfully processed smartContractDepositReturn transaction ID {query.transactionHash} after time expiry {query_time} for {depositorAddress} with amount {returnAmount} {sellingToken}# from {query.contractName}-{query.contractAddress} contract ") + add_contract_transaction_history(contract_name=query.contractName, contract_address=query.contractAddress, transactionType='smartContractDepositReturn', transactionSubType=None, sourceFloAddress=query.contractAddress, destFloAddress=depositorAddress, transferAmount=returnAmount, blockNumber=blockinfo['height'], blockHash=blockinfo['hash'], blocktime=blockinfo['time'], transactionHash=deposit_last_latest_entry.transactionHash, jsonData=json.dumps(transaction_data), parsedFloData=json.dumps(parsed_data)) systemdb_session.add(TimeActions( @@ -1409,22 +1056,6 @@ def process_contract_deposit_trigger(blockinfo, systemdb_session, active_deposit updateLatestTransaction(transaction_data, parsed_data, f"{query.contractName}-{query.contractAddress}") -def checkLocal_time_expiry_trigger_deposit(blockinfo): - # Connect to system.db with a session - while True: - try: - systemdb_session = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase) - - break - except: - time.sleep(DB_RETRY_TIMEOUT) - timeactions_tx_hashes = [] - active_contracts = return_time_active_contracts(systemdb_session) - active_deposits = return_time_active_deposits(systemdb_session) - - process_contract_time_trigger(blockinfo, systemdb_session, active_contracts) - process_contract_deposit_trigger(blockinfo, systemdb_session, active_deposits) - def check_reorg(): connection = create_database_connection('system_dbs') blockbook_api_url = 'https://blockbook.ranchimall.net/' @@ -1483,7 +1114,17 @@ def extract_contractStructure(contractName, contractAddress): return contractStructure -def process_flo_checks(transaction_data): +def processTransaction(transaction_data, parsed_data, blockinfo): + # Do the necessary checks for the inputs and outputs + # todo Rule 38 - Here we are doing FLO processing. We attach asset amounts to a FLO address, so every FLO address + # will have multiple feed ins of the asset. Each of those feedins will be an input to the address. + # an address can also spend the asset. Each of those spends is an output of that address feeding the asset into some + # other address an as input + # Rule 38 reframe - For checking any asset transfer on the flo blockchain it is possible that some transactions may use more than one + # vins. However in any single transaction the system considers valid, they can be only one source address from which the flodata is + # originting. To ensure consistency, we will have to check that even if there are more than one vins in a transaction, there should be + # exactly one FLO address on the originating side and that FLO address should be the owner of the asset tokens being transferred + # Create vinlist and outputlist vinlist = [] querylist = [] @@ -1494,14 +1135,14 @@ def process_flo_checks(transaction_data): totalinputval = float(transaction_data["valueIn"]) - # Check if all the addresses in a transaction on the input side are the same + # todo Rule 41 - Check if all the addresses in a transaction on the input side are the same for idx, item in enumerate(vinlist): if idx == 0: temp = item[0] continue if item[0] != temp: logger.info(f"System has found more than one address as part of vin. Transaction {transaction_data['txid']} is rejected") - return None, None, None + return 0 inputlist = [vinlist[0][0], totalinputval] inputadd = vinlist[0][0] @@ -1509,7 +1150,7 @@ def process_flo_checks(transaction_data): # Check if the number of vout is more than 2 (Rule 42) if len(transaction_data["vout"]) > 2: logger.info(f"System has found more than 2 addresses as part of vout. Transaction {transaction_data['txid']} is rejected") - return None, None, None + return 0 # Extract output addresses (Rule 43) outputlist = [] @@ -1529,681 +1170,657 @@ def process_flo_checks(transaction_data): outputlist = [inputlist[0]] elif len(outputlist) != 1: logger.info(f"Transaction's change is not coming back to the input address. Transaction {transaction_data['txid']} is rejected") - return None, None, None + return 0 else: outputlist = outputlist[0] - return inputlist, outputlist, inputadd + logger.info(f"Input address list : {inputlist}") + logger.info(f"Output address list : {outputlist}") + transaction_data['senderAddress'] = inputlist[0] + transaction_data['receiverAddress'] = outputlist[0] -def process_token_transfer(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd): - if not is_a_contract_address(inputlist[0]) and not is_a_contract_address(outputlist[0]): - # check if the token exists in the database - if check_database_existence('token', {'token_name':f"{parsed_data['tokenIdentification']}"}): - # Pull details of the token type from system.db database - connection = create_database_connection('system_dbs', {'db_name':'system'}) - db_details = connection.execute("SELECT db_name, db_type, keyword, object_format FROM databaseTypeMapping WHERE db_name='{}'".format(parsed_data['tokenIdentification'])) - db_details = list(zip(*db_details)) - if db_details[1][0] == 'infinite-token': - db_object = json.loads(db_details[3][0]) - if db_object['root_address'] == inputlist[0]: - isInfiniteToken = True - else: - isInfiniteToken = False - else: - isInfiniteToken = False - - # Check if the transaction hash already exists in the token db - connection = create_database_connection('token', {'token_name':f"{parsed_data['tokenIdentification']}"}) - blockno_txhash = connection.execute('SELECT blockNumber, transactionHash FROM transactionHistory').fetchall() - connection.close() - blockno_txhash_T = list(zip(*blockno_txhash)) - - if transaction_data['txid'] in list(blockno_txhash_T[1]): - logger.warning(f"Transaction {transaction_data['txid']} already exists in the token db. This is unusual, please check your code") - pushData_SSEapi(f"Error | Transaction {transaction_data['txid']} already exists in the token db. This is unusual, please check your code") - return 0 - - returnval = transferToken(parsed_data['tokenIdentification'], parsed_data['tokenAmount'], inputlist[0],outputlist[0], transaction_data, parsed_data, isInfiniteToken=isInfiniteToken, blockinfo = blockinfo) - if returnval == 0: - logger.info("Something went wrong in the token transfer method") - pushData_SSEapi(f"Error | Something went wrong while doing the internal db transactions for {transaction_data['txid']}") - return 0 - else: - updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['tokenIdentification']}", transactionType='token-transfer') - - # If this is the first interaction of the outputlist's address with the given token name, add it to token mapping - connection = create_database_connection('system_dbs', {'db_name':'system'}) - firstInteractionCheck = connection.execute(f"SELECT * FROM tokenAddressMapping WHERE tokenAddress='{outputlist[0]}' AND token='{parsed_data['tokenIdentification']}'").fetchall() - - if len(firstInteractionCheck) == 0: - connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{outputlist[0]}', '{parsed_data['tokenIdentification']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}')") - - connection.close() - - # Pass information to SSE channel - headers = {'Accept': 'application/json', 'Content-Type': 'application/json'} - # r = requests.post(tokenapi_sse_url, json={f"message': 'Token Transfer | name:{parsed_data['tokenIdentification']} | transactionHash:{transaction_data['txid']}"}, headers=headers) - return 1 - else: - rejectComment = f"Token transfer at transaction {transaction_data['txid']} rejected as a token with the name {parsed_data['tokenIdentification']} doesnt not exist" - logger.info(rejectComment) - rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 + # All FLO checks completed at this point. + # Semantic rules for parsed data begins - else: - rejectComment = f"Token transfer at transaction {transaction_data['txid']} rejected as either the input address or the output address is part of a contract address" - logger.info(rejectComment) - rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 + # todo Rule 44 - Process as per the type of transaction + if parsed_data['type'] == 'transfer': + logger.info(f"Transaction {transaction_data['txid']} is of the type transfer") + # todo Rule 45 - If the transfer type is token, then call the function transferToken to adjust the balances + if parsed_data['transferType'] == 'token': + if not is_a_contract_address(inputlist[0]) and not is_a_contract_address(outputlist[0]): + # check if the token exists in the database + if check_database_existence('token', {'token_name':f"{parsed_data['tokenIdentification']}"}): + # Pull details of the token type from system.db database + connection = create_database_connection('system_dbs', {'db_name':'system'}) + db_details = connection.execute("SELECT db_name, db_type, keyword, object_format FROM databaseTypeMapping WHERE db_name='{}'".format(parsed_data['tokenIdentification'])) + db_details = list(zip(*db_details)) + if db_details[1][0] == 'infinite-token': + db_object = json.loads(db_details[3][0]) + if db_object['root_address'] == inputlist[0]: + isInfiniteToken = True + else: + isInfiniteToken = False + else: + isInfiniteToken = False -def process_one_time_event_transfer(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd,connection,session,contractStructure): - logger.info(f"Processing one-time event transfer for transaction {transaction_data['txid']}") + # Check if the transaction hash already exists in the token db + connection = create_database_connection('token', {'token_name':f"{parsed_data['tokenIdentification']}"}) + blockno_txhash = connection.execute('SELECT blockNumber, transactionHash FROM transactionHistory').fetchall() + connection.close() + blockno_txhash_T = list(zip(*blockno_txhash)) - # Check if the transaction hash already exists in the contract db (Safety check) - participantAdd_txhash = connection.execute('SELECT participantAddress, transactionHash FROM contractparticipants').fetchall() - participantAdd_txhash_T = list(zip(*participantAdd_txhash)) + if transaction_data['txid'] in list(blockno_txhash_T[1]): + logger.warning(f"Transaction {transaction_data['txid']} already exists in the token db. This is unusual, please check your code") + pushData_SSEapi(f"Error | Transaction {transaction_data['txid']} already exists in the token db. This is unusual, please check your code") + return 0 - if len(participantAdd_txhash) != 0 and transaction_data['txid'] in list(participantAdd_txhash_T[1]): - logger.warning(f"Transaction {transaction_data['txid']} rejected as it already exists in the Smart Contract db. This is unusual, please check your code") - pushData_SSEapi(f"Error | Transaction {transaction_data['txid']} rejected as it already exists in the Smart Contract db. This is unusual, please check your code") - return 0 - - # If contractAddress was passed, then check if it matches the output address of this contract - if 'contractAddress' in parsed_data: - if parsed_data['contractAddress'] != outputlist[0]: - rejectComment = f"Contract participation at transaction {transaction_data['txid']} rejected as contractAddress specified in flodata, {parsed_data['contractAddress']}, does not match with transaction's output address {outputlist[0]}" - logger.info(rejectComment) - rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) - pushData_SSEapi(f"Error| Mismatch in contract address specified in flodata and the output address of the transaction {transaction_data['txid']}") - return 0 - - # Check the status of the contract - contractStatus = check_contract_status(parsed_data['contractName'], outputlist[0]) - - if contractStatus == 'closed': - rejectComment = f"Transaction {transaction_data['txid']} rejected as Smart contract {parsed_data['contractName']} at the {outputlist[0]} is closed" - logger.info(rejectComment) - rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) - return 0 - else: - session = create_database_session_orm('smart_contract', {'contract_name': f"{parsed_data['contractName']}", 'contract_address': f"{outputlist[0]}"}, ContractBase) - result = session.query(ContractStructure).filter_by(attribute='expiryTime').all() - session.close() - if result: - # Now parse the expiry time in Python - expirytime = result[0].value.strip() - expirytime_split = expirytime.split(' ') - parse_string = '{}/{}/{} {}'.format(expirytime_split[3], parsing.months[expirytime_split[1]], expirytime_split[2], expirytime_split[4]) - expirytime_object = parsing.arrow.get(parse_string, 'YYYY/M/D HH:mm:ss').replace(tzinfo=expirytime_split[5][3:]) - blocktime_object = parsing.arrow.get(transaction_data['time']).to('Asia/Kolkata') - - if blocktime_object > expirytime_object: - rejectComment = f"Transaction {transaction_data['txid']} rejected as Smart contract {parsed_data['contractName']}-{outputlist[0]} has expired and will not accept any user participation" - logger.info(rejectComment) - rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - - # Check if user choice has been passed to the wrong contract type - if 'userChoice' in parsed_data and 'exitconditions' not in contractStructure: - rejectComment = f"Transaction {transaction_data['txid']} rejected as userChoice, {parsed_data['userChoice']}, has been passed to Smart Contract named {parsed_data['contractName']} at the address {outputlist[0]} which doesn't accept any userChoice" - logger.info(rejectComment) - rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - - # Check if the right token is being sent for participation - if parsed_data['tokenIdentification'] != contractStructure['tokenIdentification']: - rejectComment = f"Transaction {transaction_data['txid']} rejected as the token being transferred, {parsed_data['tokenIdentification'].upper()}, is not part of the structure of Smart Contract named {parsed_data['contractName']} at the address {outputlist[0]}" - logger.info(rejectComment) - rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - - # Check if contractAmount is part of the contract structure, and enforce it if it is - if 'contractAmount' in contractStructure: - if float(contractStructure['contractAmount']) != float(parsed_data['tokenAmount']): - rejectComment = f"Transaction {transaction_data['txid']} rejected as contractAmount being transferred is not part of the structure of Smart Contract named {parsed_data['contractName']} at the address {outputlist[0]}" - logger.info(rejectComment) - rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - - partialTransferCounter = 0 - # Check if maximum subscription amount has been reached - if 'maximumsubscriptionamount' in contractStructure: - # Now parse the expiry time in Python - maximumsubscriptionamount = float(contractStructure['maximumsubscriptionamount']) - session = create_database_session_orm('smart_contract', {'contract_name': f"{parsed_data['contractName']}", 'contract_address': f"{outputlist[0]}"}, ContractBase) - - query_data = session.query(ContractParticipants.tokenAmount).all() - amountDeposited = sum(Decimal(f"{amount[0]}") if amount[0] is not None else Decimal(0) for amount in query_data) - - session.close() - - if amountDeposited is None: - amountDeposited = 0 - - if amountDeposited >= maximumsubscriptionamount: - rejectComment = f"Transaction {transaction_data['txid']} rejected as maximum subscription amount has been reached for the Smart contract named {parsed_data['contractName']} at the address {outputlist[0]}" - logger.info(rejectComment) - rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - elif (perform_decimal_operation('addition', float(amountDeposited), float(parsed_data['tokenAmount'])) > maximumsubscriptionamount): - if 'contractAmount' in contractStructure: - rejectComment = f"Transaction {transaction_data['txid']} rejected as the contractAmount surpasses the maximum subscription amount, {contractStructure['maximumsubscriptionamount']} {contractStructure['tokenIdentification'].upper()}, for the Smart contract named {parsed_data['contractName']} at the address {outputlist[0]}" - logger.info(rejectComment) - rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - else: - partialTransferCounter = 1 - rejectComment = f"Transaction {transaction_data['txid']} rejected as the partial transfer of token {contractStructure['tokenIdentification'].upper()} is not allowed, for the Smart contract named {parsed_data['contractName']} at the address {outputlist[0]}" - logger.info(rejectComment) - rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - - # Check if exitcondition exists as part of contract structure and is given in right format - if 'exitconditions' in contractStructure: - # This means the contract has an external trigger, ie. trigger coming from the contract committee - exitconditionsList = [] - for condition in contractStructure['exitconditions']: - exitconditionsList.append(contractStructure['exitconditions'][condition]) - - if parsed_data['userChoice'] in exitconditionsList: - if partialTransferCounter == 0: - # Check if the tokenAmount being transferred exists in the address & do the token transfer - returnval = transferToken(parsed_data['tokenIdentification'], parsed_data['tokenAmount'], inputlist[0], outputlist[0], transaction_data, parsed_data, blockinfo=blockinfo) - if returnval != 0: - # Store participant details in the smart contract's db - session.add(ContractParticipants(participantAddress=inputadd, - tokenAmount=parsed_data['tokenAmount'], - userChoice=parsed_data['userChoice'], - transactionHash=transaction_data['txid'], - blockNumber=transaction_data['blockheight'], - blockHash=transaction_data['blockhash'])) - session.commit() - - # Store transfer as part of ContractTransactionHistory - add_contract_transaction_history(contract_name=parsed_data['contractName'], contract_address=outputlist[0], transactionType='participation', transactionSubType=None, sourceFloAddress=inputadd, destFloAddress=outputlist[0], transferAmount=parsed_data['tokenAmount'], blockNumber=blockinfo['height'], blockHash=blockinfo['hash'], blocktime=blockinfo['time'], transactionHash=transaction_data['txid'], jsonData=json.dumps(transaction_data), parsedFloData=json.dumps(parsed_data)) - - # Store a mapping of participant address -> Contract participated in - system_session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) - system_session.add(ContractAddressMapping(address=inputadd, addressType='participant', - tokenAmount=parsed_data['tokenAmount'], - contractName=parsed_data['contractName'], - contractAddress=outputlist[0], - transactionHash=transaction_data['txid'], - blockNumber=transaction_data['blockheight'], - blockHash=transaction_data['blockhash'])) - system_session.commit() + returnval = transferToken(parsed_data['tokenIdentification'], parsed_data['tokenAmount'], inputlist[0],outputlist[0], transaction_data, parsed_data, isInfiniteToken=isInfiniteToken, blockinfo = blockinfo) + if returnval == 0: + logger.info("Something went wrong in the token transfer method") + pushData_SSEapi(f"Error | Something went wrong while doing the internal db transactions for {transaction_data['txid']}") + return 0 + else: + updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['tokenIdentification']}", transactionType='token-transfer') # If this is the first interaction of the outputlist's address with the given token name, add it to token mapping - connection = create_database_connection('system_dbs', {'db_name': 'system'}) + connection = create_database_connection('system_dbs', {'db_name':'system'}) firstInteractionCheck = connection.execute(f"SELECT * FROM tokenAddressMapping WHERE tokenAddress='{outputlist[0]}' AND token='{parsed_data['tokenIdentification']}'").fetchall() + if len(firstInteractionCheck) == 0: connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{outputlist[0]}', '{parsed_data['tokenIdentification']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}')") + connection.close() - updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['contractName']}-{outputlist[0]}", transactionType='ote-externaltrigger-participation') + + # Pass information to SSE channel + headers = {'Accept': 'application/json', 'Content-Type': 'application/json'} + # r = requests.post(tokenapi_sse_url, json={f"message': 'Token Transfer | name:{parsed_data['tokenIdentification']} | transactionHash:{transaction_data['txid']}"}, headers=headers) return 1 + else: + rejectComment = f"Token transfer at transaction {transaction_data['txid']} rejected as a token with the name {parsed_data['tokenIdentification']} doesnt not exist" + logger.info(rejectComment) + rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 + + else: + rejectComment = f"Token transfer at transaction {transaction_data['txid']} rejected as either the input address or the output address is part of a contract address" + logger.info(rejectComment) + rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 + + # todo Rule 46 - If the transfer type is smart contract, then call the function transferToken to do sanity checks & lock the balance + elif parsed_data['transferType'] == 'smartContract': + if check_database_existence('smart_contract', {'contract_name':f"{parsed_data['contractName']}", 'contract_address':f"{outputlist[0]}"}): + # Check type of contract and categorize between into ote-participation or continuous-event participation + # todo - replace all connection queries with session queries + connection = create_database_connection('smart_contract', {'contract_name':f"{parsed_data['contractName']}", 'contract_address':f"{outputlist[0]}"}) + contract_session = create_database_session_orm('smart_contract', {'contract_name':f"{parsed_data['contractName']}", 'contract_address':f"{outputlist[0]}"}, ContractBase) + contract_type = contract_session.query(ContractStructure.value).filter(ContractStructure.attribute == 'contractType').first()[0] + + contractStructure = extract_contractStructure(parsed_data['contractName'], outputlist[0]) + + if contract_type == 'one-time-event': + # Check if the transaction hash already exists in the contract db (Safety check) + participantAdd_txhash = connection.execute('SELECT participantAddress, transactionHash FROM contractparticipants').fetchall() + participantAdd_txhash_T = list(zip(*participantAdd_txhash)) + + if len(participantAdd_txhash) != 0 and transaction_data['txid'] in list(participantAdd_txhash_T[1]): + logger.warning(f"Transaction {transaction_data['txid']} rejected as it already exists in the Smart Contract db. This is unusual, please check your code") + pushData_SSEapi(f"Error | Transaction {transaction_data['txid']} rejected as it already exists in the Smart Contract db. This is unusual, please check your code") + return 0 + + # if contractAddress was passed, then check if it matches the output address of this contract + if 'contractAddress' in parsed_data: + if parsed_data['contractAddress'] != outputlist[0]: + rejectComment = f"Contract participation at transaction {transaction_data['txid']} rejected as contractAddress specified in flodata, {parsed_data['contractAddress']}, doesnt not match with transaction's output address {outputlist[0]}" + logger.info(rejectComment) + # Store transfer as part of RejectedContractTransactionHistory + rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) + # Pass information to SSE channel + pushData_SSEapi(f"Error| Mismatch in contract address specified in flodata and the output address of the transaction {transaction_data['txid']}") + return 0 + + # check the status of the contract + contractStatus = check_contract_status(parsed_data['contractName'], outputlist[0]) + contractList = [] + + if contractStatus == 'closed': + rejectComment = f"Transaction {transaction_data['txid']} closed as Smart contract {parsed_data['contractName']} at the {outputlist[0]} is closed" + logger.info(rejectComment) + rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) + return 0 + else: + session = create_database_session_orm('smart_contract', {'contract_name': f"{parsed_data['contractName']}", 'contract_address': f"{outputlist[0]}"}, ContractBase) + result = session.query(ContractStructure).filter_by(attribute='expiryTime').all() + session.close() + if result: + # now parse the expiry time in python + expirytime = result[0].value.strip() + expirytime_split = expirytime.split(' ') + parse_string = '{}/{}/{} {}'.format(expirytime_split[3], parsing.months[expirytime_split[1]], expirytime_split[2], expirytime_split[4]) + expirytime_object = parsing.arrow.get(parse_string, 'YYYY/M/D HH:mm:ss').replace(tzinfo=expirytime_split[5][3:]) + blocktime_object = parsing.arrow.get(transaction_data['time']).to('Asia/Kolkata') + + if blocktime_object > expirytime_object: + rejectComment = f"Transaction {transaction_data['txid']} rejected as Smart contract {parsed_data['contractName']}-{outputlist[0]} has expired and will not accept any user participation" + logger.info(rejectComment) + rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 + + # check if user choice has been passed, to the wrong contract type + if 'userChoice' in parsed_data and 'exitconditions' not in contractStructure: + rejectComment = f"Transaction {transaction_data['txid']} rejected as userChoice, {parsed_data['userChoice']}, has been passed to Smart Contract named {parsed_data['contractName']} at the address {outputlist[0]} which doesn't accept any userChoice" + logger.info(rejectComment) + rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 + + # check if the right token is being sent for participation + if parsed_data['tokenIdentification'] != contractStructure['tokenIdentification']: + rejectComment = f"Transaction {transaction_data['txid']} rejected as the token being transferred, {parsed_data['tokenIdentidication'].upper()}, is not part of the structure of Smart Contract named {parsed_data['contractName']} at the address {outputlist[0]}" + logger.info(rejectComment) + rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 + + # Check if contractAmount is part of the contract structure, and enforce it if it is + if 'contractAmount' in contractStructure: + if float(contractStructure['contractAmount']) != float(parsed_data['tokenAmount']): + rejectComment = f"Transaction {transaction_data['txid']} rejected as contractAmount being transferred is not part of the structure of Smart Contract named {parsed_data['contractName']} at the address {outputlist[0]}" + logger.info(rejectComment) + rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 + + partialTransferCounter = 0 + # Check if maximum subscription amount has reached + if 'maximumsubscriptionamount' in contractStructure: + # now parse the expiry time in python + maximumsubscriptionamount = float(contractStructure['maximumsubscriptionamount']) + session = create_database_session_orm('smart_contract', {'contract_name': f"{parsed_data['contractName']}", 'contract_address': f"{outputlist[0]}"}, ContractBase) + # amountDeposited = session.query(func.sum(ContractParticipants.tokenAmount)).all()[0][0] + + query_data = session.query(ContractParticipants.tokenAmount).all() + amountDeposited = sum(Decimal(f"{amount[0]}") if amount[0] is not None else Decimal(0) for amount in query_data) + + session.close() + + if amountDeposited is None: + amountDeposited = 0 + + if amountDeposited >= maximumsubscriptionamount: + rejectComment = f"Transaction {transaction_data['txid']} rejected as maximum subscription amount has been reached for the Smart contract named {parsed_data['contractName']} at the address {outputlist[0]}" + logger.info(rejectComment) + rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 + elif (perform_decimal_operation('addition', float(amountDeposited), float(parsed_data['tokenAmount'])) > maximumsubscriptionamount): + if 'contractAmount' in contractStructure: + rejectComment = f"Transaction {transaction_data['txid']} rejected as the contractAmount surpasses the maximum subscription amount, {contractStructure['maximumsubscriptionamount']} {contractStructure['tokenIdentification'].upper()}, for the Smart contract named {parsed_data['contractName']} at the address {outputlist[0]}" + logger.info(rejectComment) + rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 + else: + partialTransferCounter = 1 + rejectComment = f"Transaction {transaction_data['txid']} rejected as the partial transfer of token {contractStructure['tokenIdentification'].upper()} is not allowed, for the Smart contract named {parsed_data['contractName']} at the address {outputlist[0]}" + logger.info(rejectComment) + rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 + + # Check if exitcondition exists as part of contractstructure and is given in right format + if 'exitconditions' in contractStructure: + # This means the contract has an external trigger, ie. trigger coming from the contract committee + exitconditionsList = [] + for condition in contractStructure['exitconditions']: + exitconditionsList.append(contractStructure['exitconditions'][condition]) + + if parsed_data['userChoice'] in exitconditionsList: + if partialTransferCounter == 0: + # Check if the tokenAmount being transferred exists in the address & do the token transfer + returnval = transferToken(parsed_data['tokenIdentification'], parsed_data['tokenAmount'], inputlist[0], outputlist[0], transaction_data, parsed_data, blockinfo = blockinfo) + if returnval != 0: + # Store participant details in the smart contract's db + session.add(ContractParticipants(participantAddress=inputadd, + tokenAmount=parsed_data['tokenAmount'], + userChoice=parsed_data['userChoice'], + transactionHash=transaction_data['txid'], + blockNumber=transaction_data['blockheight'], + blockHash=transaction_data['blockhash'])) + session.commit() + + # Store transfer as part of ContractTransactionHistory + add_contract_transaction_history(contract_name=parsed_data['contractName'], contract_address=outputlist[0], transactionType='participation', transactionSubType=None, sourceFloAddress=inputadd, destFloAddress=outputlist[0], transferAmount=parsed_data['tokenAmount'], blockNumber=blockinfo['height'], blockHash=blockinfo['hash'], blocktime=blockinfo['time'], transactionHash=transaction_data['txid'], jsonData=json.dumps(transaction_data), parsedFloData=json.dumps(parsed_data)) + + # Store a mapping of participant address -> Contract participated in + session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) + session.add(ContractAddressMapping(address=inputadd, addressType='participant', + tokenAmount=parsed_data['tokenAmount'], + contractName=parsed_data['contractName'], + contractAddress=outputlist[0], + transactionHash=transaction_data['txid'], + blockNumber=transaction_data['blockheight'], + blockHash=transaction_data['blockhash'])) + session.commit() + + # If this is the first interaction of the outputlist's address with the given token name, add it to token mapping + connection = create_database_connection('system_dbs', {'db_name':'system'}) + firstInteractionCheck = connection.execute(f"SELECT * FROM tokenAddressMapping WHERE tokenAddress='{outputlist[0]}' AND token='{parsed_data['tokenIdentification']}'").fetchall() + if len(firstInteractionCheck) == 0: + connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{outputlist[0]}', '{parsed_data['tokenIdentification']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}')") + connection.close() + updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['contractName']}-{outputlist[0]}", transactionType='ote-externaltrigger-participation') + return 1 + + else: + logger.info("Something went wrong in the smartcontract token transfer method") + return 0 + elif partialTransferCounter == 1: + # Transfer only part of the tokens users specified, till the time it reaches maximumamount + returnval = transferToken(parsed_data['tokenIdentification'], perform_decimal_operation('subtraction', maximumsubscriptionamount, amountDeposited), inputlist[0], outputlist[0], transaction_data, parsed_data, blockinfo = blockinfo) + if returnval != 0: + # Store participant details in the smart contract's db + session.add(ContractParticipants(participantAddress=inputadd, + tokenAmount=perform_decimal_operation('subtraction', maximumsubscriptionamount, amountDeposited), + userChoice=parsed_data['userChoice'], + transactionHash=transaction_data['txid'], + blockNumber=transaction_data['blockheight'], + blockHash=transaction_data['blockhash'])) + session.commit() + session.close() + + # Store transfer as part of ContractTransactionHistory + add_contract_transaction_history(contract_name=parsed_data['contractName'], contract_address=outputlist[0], transactionType='participation', transactionSubType=None, sourceFloAddress=inputadd, destFloAddress=outputlist[0], transferAmount=perform_decimal_operation('subtraction', maximumsubscriptionamount, amountDeposited), blockNumber=blockinfo['height'], blockHash=blockinfo['hash'], blocktime=blockinfo['time'], transactionHash=transaction_data['txid'], jsonData=json.dumps(transaction_data), parsedFloData=json.dumps(parsed_data)) + + # Store a mapping of participant address -> Contract participated in + session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) + session.add(ContractAddressMapping(address=inputadd, addressType='participant', + tokenAmount=perform_decimal_operation('subtraction', maximumsubscriptionamount, amountDeposited), + contractName=parsed_data['contractName'], + contractAddress=outputlist[0], + transactionHash=transaction_data['txid'], + blockNumber=transaction_data['blockheight'], + blockHash=transaction_data['blockhash'])) + session.commit() + session.close() + updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['contractName']}-{outputlist[0]}", transactionType='ote-externaltrigger-participation') + return 1 + + else: + logger.info("Something went wrong in the smartcontract token transfer method") + return 0 + + else: + rejectComment = f"Transaction {transaction_data['txid']} rejected as wrong userchoice entered for the Smart Contract named {parsed_data['contractName']} at the address {outputlist[0]}" + logger.info(rejectComment) + rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 + + elif 'payeeAddress' in contractStructure: + # this means the contract is of the type internal trigger + if partialTransferCounter == 0: + transferAmount = parsed_data['tokenAmount'] + elif partialTransferCounter == 1: + transferAmount = perform_decimal_operation('subtraction', maximumsubscriptionamount, amountDeposited) + + # Check if the tokenAmount being transferred exists in the address & do the token transfer + returnval = transferToken(parsed_data['tokenIdentification'], transferAmount, inputlist[0], outputlist[0], transaction_data, parsed_data, blockinfo = blockinfo) + if returnval != 0: + # Store participant details in the smart contract's db + session.add(ContractParticipants(participantAddress=inputadd, tokenAmount=transferAmount, userChoice='-', transactionHash=transaction_data['txid'], blockNumber=transaction_data['blockheight'], blockHash=transaction_data['blockhash'])) + + # Store transfer as part of ContractTransactionHistory + add_contract_transaction_history(contract_name=parsed_data['contractName'], contract_address=outputlist[0], transactionType='participation', transactionSubType=None, sourceFloAddress=inputadd, destFloAddress=outputlist[0], transferAmount=transferAmount, blockNumber=blockinfo['height'], blockHash=blockinfo['hash'], blocktime=blockinfo['time'], transactionHash=transaction_data['txid'], jsonData=json.dumps(transaction_data), parsedFloData=json.dumps(parsed_data)) + session.commit() + session.close() + + # Store a mapping of participant address -> Contract participated in + session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) + session.add(ContractAddressMapping(address=inputadd, addressType='participant', + tokenAmount=transferAmount, + contractName=parsed_data['contractName'], + contractAddress=outputlist[0], + transactionHash=transaction_data['txid'], + blockNumber=transaction_data['blockheight'], + blockHash=transaction_data['blockhash'])) + session.commit() + + # If this is the first interaction of the outputlist's address with the given token name, add it to token mapping + connection = create_database_connection('system_dbs', {'db_name':'system'}) + firstInteractionCheck = connection.execute(f"SELECT * FROM tokenAddressMapping WHERE tokenAddress='{outputlist[0]}' AND token='{parsed_data['tokenIdentification']}'").fetchall() + if len(firstInteractionCheck) == 0: + connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{outputlist[0]}', '{parsed_data['tokenIdentification']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}')") + connection.close() + updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['contractName']}-{outputlist[0]}", transactionType='ote-internaltrigger-participation') + return 1 + + else: + logger.info("Something went wrong in the smartcontract token transfer method") + return 0 + + elif contract_type == 'continuos-event': + contract_subtype = contract_session.query(ContractStructure.value).filter(ContractStructure.attribute == 'subtype').first()[0] + if contract_subtype == 'tokenswap': + # Check if the transaction hash already exists in the contract db (Safety check) + participantAdd_txhash = connection.execute('SELECT participantAddress, transactionHash FROM contractparticipants').fetchall() + participantAdd_txhash_T = list(zip(*participantAdd_txhash)) + + if len(participantAdd_txhash) != 0 and transaction_data['txid'] in list(participantAdd_txhash_T[1]): + logger.warning(f"Transaction {transaction_data['txid']} rejected as it already exists in the Smart Contract db. This is unusual, please check your code") + pushData_SSEapi(f"Error | Transaction {transaction_data['txid']} rejected as it already exists in the Smart Contract db. This is unusual, please check your code") + return 0 + + # if contractAddress was passed, then check if it matches the output address of this contract + if 'contractAddress' in parsed_data: + if parsed_data['contractAddress'] != outputlist[0]: + rejectComment = f"Contract participation at transaction {transaction_data['txid']} rejected as contractAddress specified in flodata, {parsed_data['contractAddress']}, doesnt not match with transaction's output address {outputlist[0]}" + logger.info(rejectComment) + rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) + # Pass information to SSE channel + pushData_SSEapi(f"Error| Mismatch in contract address specified in flodata and the output address of the transaction {transaction_data['txid']}") + return 0 + + if contractStructure['pricetype'] in ['predetermined','determined']: + swapPrice = float(contractStructure['price']) + elif contractStructure['pricetype'] == 'dynamic': + # Oracle address cannot be a participant in the contract. Check if the sender address is oracle address + if transaction_data['senderAddress'] == contractStructure['oracle_address']: + logger.warning(f"Transaction {transaction_data['txid']} rejected as the oracle addess {contractStructure['oracle_address']} is attempting to participate. Please report this to the contract owner") + pushData_SSEapi(f"Transaction {transaction_data['txid']} rejected as the oracle addess {contractStructure['oracle_address']} is attempting to participate. Please report this to the contract owner") + return 0 + + swapPrice = fetchDynamicSwapPrice(contractStructure, blockinfo) + + # swapAmount = float(parsed_data['tokenAmount'])/swapPrice + swapAmount = perform_decimal_operation('division', parsed_data['tokenAmount'], swapPrice) + + # Check if the swap amount is available in the deposits of the selling token + # if yes do the transfers, otherwise reject the transaction + # + subquery = contract_session.query(func.max(ContractDeposits.id)).group_by(ContractDeposits.transactionHash) + active_contract_deposits = contract_session.query(ContractDeposits).filter(ContractDeposits.id.in_(subquery)).filter(ContractDeposits.status != 'deposit-return').filter(ContractDeposits.status != 'consumed').filter(ContractDeposits.status == 'active').all() + + # todo - what is the role of the next line? cleanup if not useful + available_deposits = active_contract_deposits[:] + + # available_deposit_sum = contract_session.query(func.sum(ContractDeposits.depositBalance)).filter(ContractDeposits.id.in_(subquery)).filter(ContractDeposits.status != 'deposit-return').filter(ContractDeposits.status == 'active').all() + + query_data = contract_session.query(ContractDeposits.depositBalance).filter(ContractDeposits.id.in_(subquery)).filter(ContractDeposits.status != 'deposit-return').filter(ContractDeposits.status == 'active').all() + + available_deposit_sum = sum(Decimal(f"{amount[0]}") if amount[0] is not None else Decimal(0) for amount in query_data) + if available_deposit_sum==0 or available_deposit_sum[0][0] is None: + available_deposit_sum = 0 + else: + available_deposit_sum = float(available_deposit_sum[0][0]) + + if available_deposit_sum >= swapAmount: + # accepting token transfer from participant to smart contract address + returnval = transferToken(parsed_data['tokenIdentification'], parsed_data['tokenAmount'], inputlist[0], outputlist[0], transaction_data=transaction_data, parsed_data=parsed_data, isInfiniteToken=None, blockinfo=blockinfo, transactionType='tokenswapParticipation') + if returnval == 0: + logger.info("ERROR | Something went wrong in the token transfer method while doing local Smart Contract Particiaption") + return 0 + + # If this is the first interaction of the outputlist's address with the given token name, add it to token mapping + systemdb_connection = create_database_connection('system_dbs', {'db_name':'system'}) + firstInteractionCheck = systemdb_connection.execute(f"SELECT * FROM tokenAddressMapping WHERE tokenAddress='{outputlist[0]}' AND token='{parsed_data['tokenIdentification']}'").fetchall() + if len(firstInteractionCheck) == 0: + systemdb_connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{outputlist[0]}', '{parsed_data['tokenIdentification']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}')") + systemdb_connection.close() + + + # ContractDepositTable + # For each unique deposit( address, expirydate, blocknumber) there will be 2 entries added to the table + # the consumption of the deposits will start form the top of the table + deposit_counter = 0 + remaining_amount = swapAmount + for a_deposit in available_deposits: + if a_deposit.depositBalance > remaining_amount: + # accepting token transfer from the contract to depositor's address + returnval = transferToken(contractStructure['accepting_token'], perform_decimal_operation('multiply', remaining_amount, swapPrice), contractStructure['contractAddress'], a_deposit.depositorAddress, transaction_data=transaction_data, parsed_data=parsed_data, isInfiniteToken=None, blockinfo=blockinfo, transactionType='tokenswapDepositSettlement') + if returnval == 0: + logger.info("CRITICAL ERROR | Something went wrong in the token transfer method while doing local Smart Contract Particiaption deposit swap operation") + return 0 + + # If this is the first interaction of the outputlist's address with the given token name, add it to token mapping + systemdb_connection = create_database_connection('system_dbs', {'db_name':'system'}) + firstInteractionCheck = systemdb_connection.execute(f"SELECT * FROM tokenAddressMapping WHERE tokenAddress='{a_deposit.depositorAddress}' AND token='{contractStructure['accepting_token']}'").fetchall() + if len(firstInteractionCheck) == 0: + systemdb_connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{a_deposit.depositorAddress}', '{contractStructure['accepting_token']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}')") + systemdb_connection.close() + + + contract_session.add(ContractDeposits( depositorAddress= a_deposit.depositorAddress, + depositAmount= perform_decimal_operation('subtraction', 0, remaining_amount), + status='deposit-honor', + transactionHash= a_deposit.transactionHash, + blockNumber= blockinfo['height'], + blockHash= blockinfo['hash'])) + + # if the total is consumsed then the following entry won't take place + contract_session.add(ContractDeposits( depositorAddress= a_deposit.depositorAddress, + depositBalance= perform_decimal_operation('subtraction', a_deposit.depositBalance, remaining_amount), + expiryTime = a_deposit.expiryTime, + unix_expiryTime = a_deposit.unix_expiryTime, + status='active', + transactionHash= a_deposit.transactionHash, + blockNumber= blockinfo['height'], + blockHash= blockinfo['hash'])) + # ConsumedInfoTable + contract_session.add(ConsumedInfo( id_deposittable= a_deposit.id, + transactionHash= a_deposit.transactionHash, + blockNumber= blockinfo['height'])) + remaining_amount = perform_decimal_operation('subtraction', remaining_amount, a_deposit.depositBalance) + remaining_amount = 0 + break + + elif a_deposit.depositBalance <= remaining_amount: + # accepting token transfer from the contract to depositor's address + returnval = transferToken(contractStructure['accepting_token'], perform_decimal_operation('multiplication', a_deposit.depositBalance, swapPrice), contractStructure['contractAddress'], a_deposit.depositorAddress, transaction_data=transaction_data, parsed_data=parsed_data, isInfiniteToken=None, blockinfo=blockinfo, transactionType='tokenswapDepositSettlement') + if returnval == 0: + logger.info("CRITICAL ERROR | Something went wrong in the token transfer method while doing local Smart Contract Particiaption deposit swap operation") + return 0 + + # If this is the first interaction of the outputlist's address with the given token name, add it to token mapping + systemdb_connection = create_database_connection('system_dbs', {'db_name':'system'}) + firstInteractionCheck = systemdb_connection.execute(f"SELECT * FROM tokenAddressMapping WHERE tokenAddress='{a_deposit.depositorAddress}' AND token='{contractStructure['accepting_token']}'").fetchall() + if len(firstInteractionCheck) == 0: + systemdb_connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{a_deposit.depositorAddress}', '{contractStructure['accepting_token']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}')") + systemdb_connection.close() + + + contract_session.add(ContractDeposits( depositorAddress= a_deposit.depositorAddress, + depositAmount= perform_decimal_operation('subtraction', 0, a_deposit.depositBalance), + status='deposit-honor', + transactionHash= a_deposit.transactionHash, + blockNumber= blockinfo['height'], + blockHash= blockinfo['hash'])) + + contract_session.add(ContractDeposits( depositorAddress= a_deposit.depositorAddress, + depositBalance= 0, + expiryTime = a_deposit.expiryTime, + unix_expiryTime = a_deposit.unix_expiryTime, + status='consumed', + transactionHash= a_deposit.transactionHash, + blockNumber= blockinfo['height'], + blockHash= blockinfo['hash'])) + # ConsumedInfoTable + contract_session.add(ConsumedInfo( id_deposittable= a_deposit.id, + transactionHash= a_deposit.transactionHash, + blockNumber= blockinfo['height'])) + remaining_amount = perform_decimal_operation('subtraction', remaining_amount, a_deposit.depositBalance) + + systemdb_session = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase) + systemdb_entry = systemdb_session.query(TimeActions.activity, TimeActions.contractType, TimeActions.tokens_db, TimeActions.parsed_data).filter(TimeActions.transactionHash == a_deposit.transactionHash).first() + systemdb_session.add(TimeActions( + time = a_deposit.expiryTime, + activity = systemdb_entry[0], + status = 'consumed', + contractName = parsed_data['contractName'], + contractAddress = outputlist[0], + contractType = systemdb_entry[1], + tokens_db = systemdb_entry[2], + parsed_data = systemdb_entry[3], + transactionHash = a_deposit.transactionHash, + blockNumber = blockinfo['height'] + )) + systemdb_session.commit() + del systemdb_session + + # token transfer from the contract to participant's address + returnval = transferToken(contractStructure['selling_token'], swapAmount, outputlist[0], inputlist[0], transaction_data=transaction_data, parsed_data=parsed_data, isInfiniteToken=None, blockinfo=blockinfo, transactionType='tokenswapParticipationSettlement') + if returnval == 0: + logger.info("CRITICAL ERROR | Something went wrong in the token transfer method while doing local Smart Contract Particiaption") + return 0 + + # ContractParticipationTable + contract_session.add(ContractParticipants(participantAddress = transaction_data['senderAddress'], tokenAmount= parsed_data['tokenAmount'], userChoice= swapPrice, transactionHash= transaction_data['txid'], blockNumber= blockinfo['height'], blockHash= blockinfo['hash'], winningAmount = swapAmount)) + + add_contract_transaction_history(contract_name=parsed_data['contractName'], contract_address=outputlist[0], transactionType='participation', transactionSubType='swap', sourceFloAddress=inputlist[0], destFloAddress=outputlist[0], transferAmount=swapAmount, blockNumber=blockinfo['height'], blockHash=blockinfo['hash'], blocktime=blockinfo['time'], transactionHash=transaction_data['txid'], jsonData=json.dumps(transaction_data), parsedFloData=json.dumps(parsed_data)) + + contract_session.commit() + contract_session.close() + + # If this is the first interaction of the participant's address with the given token name, add it to token mapping + systemdb_connection = create_database_connection('system_dbs', {'db_name':'system'}) + firstInteractionCheck = systemdb_connection.execute(f"SELECT * FROM tokenAddressMapping WHERE tokenAddress='{inputlist[0]}' AND token='{contractStructure['selling_token']}'").fetchall() + if len(firstInteractionCheck) == 0: + systemdb_connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{inputlist[0]}', '{contractStructure['selling_token']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}')") + systemdb_connection.close() + + updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['contractName']}-{outputlist[0]}", transactionType='tokenswapParticipation') + pushData_SSEapi(f"Token swap successfully performed at contract {parsed_data['contractName']}-{outputlist[0]} with the transaction {transaction_data['txid']}") + + else: + # Reject the participation saying not enough deposit tokens are available + rejectComment = f"Swap participation at transaction {transaction_data['txid']} rejected as requested swap amount is {swapAmount} but {available_deposit_sum} is available" + logger.info(rejectComment) + rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 else: - logger.info("Something went wrong in the smartcontract token transfer method") - return 0 - elif partialTransferCounter == 1: - # Transfer only part of the tokens users specified, till the time it reaches maximum amount - returnval = transferToken(parsed_data['tokenIdentification'], perform_decimal_operation('subtraction', maximumsubscriptionamount, amountDeposited), inputlist[0], outputlist[0], transaction_data, parsed_data, blockinfo=blockinfo) - if returnval != 0: - # Store participant details in the smart contract's db - session.add(ContractParticipants(participantAddress=inputadd, - tokenAmount=perform_decimal_operation('subtraction', maximumsubscriptionamount, amountDeposited), - userChoice=parsed_data['userChoice'], - transactionHash=transaction_data['txid'], - blockNumber=transaction_data['blockheight'], - blockHash=transaction_data['blockhash'])) - session.commit() - session.close() - - # Store transfer as part of ContractTransactionHistory - add_contract_transaction_history(contract_name=parsed_data['contractName'], contract_address=outputlist[0], transactionType='participation', transactionSubType=None, sourceFloAddress=inputadd, destFloAddress=outputlist[0], transferAmount=perform_decimal_operation('subtraction', maximumsubscriptionamount, amountDeposited), blockNumber=blockinfo['height'], blockHash=blockinfo['hash'], blocktime=blockinfo['time'], transactionHash=transaction_data['txid'], jsonData=json.dumps(transaction_data), parsedFloData=json.dumps(parsed_data)) - - # Store a mapping of participant address -> Contract participated in - system_session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) - system_session.add(ContractAddressMapping(address=inputadd, addressType='participant', - tokenAmount=perform_decimal_operation('subtraction', maximumsubscriptionamount, amountDeposited), - contractName=parsed_data['contractName'], - contractAddress=outputlist[0], - transactionHash=transaction_data['txid'], - blockNumber=transaction_data['blockheight'], - blockHash=transaction_data['blockhash'])) - system_session.commit() - system_session.close() - updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['contractName']}-{outputlist[0]}", transactionType='ote-externaltrigger-participation') - return 1 - - else: - logger.info("Something went wrong in the smartcontract token transfer method") + rejectComment = f"Transaction {transaction_data['txid']} rejected as the participation doesn't belong to any valid contract type" + logger.info(rejectComment) + rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) return 0 - else: - rejectComment = f"Transaction {transaction_data['txid']} rejected as wrong user choice entered for the Smart Contract named {parsed_data['contractName']} at the address {outputlist[0]}" - logger.info(rejectComment) - rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - - elif 'payeeAddress' in contractStructure: - # This means the contract is of the type internal trigger - if partialTransferCounter == 0: - transferAmount = parsed_data['tokenAmount'] - elif partialTransferCounter == 1: - transferAmount = perform_decimal_operation('subtraction', maximumsubscriptionamount, amountDeposited) - - # Check if the tokenAmount being transferred exists in the address & do the token transfer - returnval = transferToken(parsed_data['tokenIdentification'], transferAmount, inputlist[0], outputlist[0], transaction_data, parsed_data, blockinfo=blockinfo) - if returnval != 0: - # Store participant details in the smart contract's db - session.add(ContractParticipants(participantAddress=inputadd, tokenAmount=transferAmount, userChoice='-', transactionHash=transaction_data['txid'], blockNumber=transaction_data['blockheight'], blockHash=transaction_data['blockhash'])) - - # Store transfer as part of ContractTransactionHistory - add_contract_transaction_history(contract_name=parsed_data['contractName'], contract_address=outputlist[0], transactionType='participation', transactionSubType=None, sourceFloAddress=inputadd, destFloAddress=outputlist[0], transferAmount=transferAmount, blockNumber=blockinfo['height'], blockHash=blockinfo['hash'], blocktime=blockinfo['time'], transactionHash=transaction_data['txid'], jsonData=json.dumps(transaction_data), parsedFloData=json.dumps(parsed_data)) - session.commit() - session.close() - - # Store a mapping of participant address -> Contract participated in - system_session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) - system_session.add(ContractAddressMapping(address=inputadd, addressType='participant', - tokenAmount=transferAmount, - contractName=parsed_data['contractName'], - contractAddress=outputlist[0], - transactionHash=transaction_data['txid'], - blockNumber=transaction_data['blockheight'], - blockHash=transaction_data['blockhash'])) - system_session.commit() - - # If this is the first interaction of the outputlist's address with the given token name, add it to token mapping - connection = create_database_connection('system_dbs', {'db_name': 'system'}) - firstInteractionCheck = connection.execute(f"SELECT * FROM tokenAddressMapping WHERE tokenAddress='{outputlist[0]}' AND token='{parsed_data['tokenIdentification']}'").fetchall() - if len(firstInteractionCheck) == 0: - connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{outputlist[0]}', '{parsed_data['tokenIdentification']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}')") - connection.close() - updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['contractName']}-{outputlist[0]}", transactionType='ote-internaltrigger-participation') - return 1 - - else: - logger.info("Something went wrong in the smartcontract token transfer method") - return 0 - - return 1 # Indicate successful processing of the one-time event transfer - - - - - -def process_continuous_event_transfer(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd, connection, contract_session, contractStructure): - logger.info(f"Processing continuous event transfer for transaction {transaction_data['txid']}") - - # Determine the subtype of the contract - contract_subtype = contract_session.query(ContractStructure.value).filter(ContractStructure.attribute == 'subtype').first()[0] - - - if contract_subtype == 'tokenswap': - # Check if the transaction hash already exists in the contract db (Safety check) - participantAdd_txhash = connection.execute('SELECT participantAddress, transactionHash FROM contractparticipants').fetchall() - participantAdd_txhash_T = list(zip(*participantAdd_txhash)) - - if len(participantAdd_txhash) != 0 and transaction_data['txid'] in list(participantAdd_txhash_T[1]): - logger.warning(f"Transaction {transaction_data['txid']} rejected as it already exists in the Smart Contract db. This is unusual, please check your code") - pushData_SSEapi(f"Error | Transaction {transaction_data['txid']} rejected as it already exists in the Smart Contract db. This is unusual, please check your code") - return 0 - - # if contractAddress was passed, then check if it matches the output address of this contract - if 'contractAddress' in parsed_data: - if parsed_data['contractAddress'] != outputlist[0]: - rejectComment = f"Contract participation at transaction {transaction_data['txid']} rejected as contractAddress specified in flodata, {parsed_data['contractAddress']}, doesnt not match with transaction's output address {outputlist[0]}" + else: + rejectComment = f"Transaction {transaction_data['txid']} rejected as a Smart Contract with the name {parsed_data['contractName']} at address {outputlist[0]} doesnt exist" logger.info(rejectComment) rejected_contract_transaction_history(transaction_data, parsed_data, 'participation', outputlist[0], inputadd, outputlist[0], rejectComment) - # Pass information to SSE channel - pushData_SSEapi(f"Error| Mismatch in contract address specified in flodata and the output address of the transaction {transaction_data['txid']}") - return 0 - - if contractStructure['pricetype'] in ['predetermined','determined']: - swapPrice = float(contractStructure['price']) - elif contractStructure['pricetype'] == 'dynamic': - # Oracle address cannot be a participant in the contract. Check if the sender address is oracle address - if transaction_data['senderAddress'] == contractStructure['oracle_address']: - logger.warning(f"Transaction {transaction_data['txid']} rejected as the oracle addess {contractStructure['oracle_address']} is attempting to participate. Please report this to the contract owner") - pushData_SSEapi(f"Transaction {transaction_data['txid']} rejected as the oracle addess {contractStructure['oracle_address']} is attempting to participate. Please report this to the contract owner") return 0 - swapPrice = fetchDynamicSwapPrice(connection, blockinfo) + elif parsed_data['transferType'] == 'nft': + if not is_a_contract_address(inputlist[0]) and not is_a_contract_address(outputlist[0]): + # check if the token exists in the database + if check_database_existence('token', {'token_name':f"{parsed_data['tokenIdentification']}"}): + # Pull details of the token type from system.db database + connection = create_database_connection('system_dbs', {'db_name':'system'}) + db_details = connection.execute("SELECT db_name, db_type, keyword, object_format FROM databaseTypeMapping WHERE db_name='{}'".format(parsed_data['tokenIdentification'])) + db_details = list(zip(*db_details)) + if db_details[1][0] == 'infinite-token': + db_object = json.loads(db_details[3][0]) + if db_object['root_address'] == inputlist[0]: + isInfiniteToken = True + else: + isInfiniteToken = False + else: + isInfiniteToken = False - swapAmount = perform_decimal_operation('division', parsed_data['tokenAmount'], swapPrice) + # Check if the transaction hash already exists in the token db + connection = create_database_connection('token', {'token_name':f"{parsed_data['tokenIdentification']}"}) + blockno_txhash = connection.execute('SELECT blockNumber, transactionHash FROM transactionHistory').fetchall() + connection.close() + blockno_txhash_T = list(zip(*blockno_txhash)) - # Check if the swap amount is available in the deposits of the selling token - # if yes do the transfers, otherwise reject the transaction - # - subquery = contract_session.query(func.max(ContractDeposits.id)).group_by(ContractDeposits.transactionHash) - active_contract_deposits = contract_session.query(ContractDeposits).filter(ContractDeposits.id.in_(subquery)).filter(ContractDeposits.status != 'deposit-return').filter(ContractDeposits.status != 'consumed').filter(ContractDeposits.status == 'active').all() - - # todo - what is the role of the next line? cleanup if not useful - available_deposits = active_contract_deposits[:] - - # available_deposit_sum = contract_session.query(func.sum(ContractDeposits.depositBalance)).filter(ContractDeposits.id.in_(subquery)).filter(ContractDeposits.status != 'deposit-return').filter(ContractDeposits.status == 'active').all() - - query_data = contract_session.query(ContractDeposits.depositBalance).filter(ContractDeposits.id.in_(subquery)).filter(ContractDeposits.status != 'deposit-return').filter(ContractDeposits.status == 'active').all() - - available_deposit_sum = sum(Decimal(f"{amount[0]}") if amount[0] is not None else Decimal(0) for amount in query_data) - if available_deposit_sum==0 or available_deposit_sum[0][0] is None: - available_deposit_sum = 0 - else: - available_deposit_sum = float(available_deposit_sum[0][0]) - - - if available_deposit_sum >= swapAmount: - # Accepting token transfer from participant to smart contract address - logger.info(f"Accepting 'tokenswapParticipation' transaction ID {transaction_data['txid']} from participant {inputlist[0]} for {parsed_data['tokenAmount']} {parsed_data['tokenIdentification']}# to smart contract address for swap amount {swapAmount} and swap-price {swapPrice}") - returnval = transferToken(parsed_data['tokenIdentification'], parsed_data['tokenAmount'], inputlist[0], outputlist[0], transaction_data=transaction_data, parsed_data=parsed_data, isInfiniteToken=None, blockinfo=blockinfo, transactionType='tokenswapParticipation') - if returnval == 0: - logger.info("ERROR | Something went wrong in the token transfer method while doing local Smart Contract Participation") - return 0 - - # If this is the first interaction of the outputlist's address with the given token name, add it to token mapping - systemdb_connection = create_database_connection('system_dbs', {'db_name': 'system'}) - firstInteractionCheck = systemdb_connection.execute(f"SELECT * FROM tokenAddressMapping WHERE tokenAddress='{outputlist[0]}' AND token='{parsed_data['tokenIdentification']}'").fetchall() - if len(firstInteractionCheck) == 0: - systemdb_connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{outputlist[0]}', '{parsed_data['tokenIdentification']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}')") - systemdb_connection.close() - - # ContractDepositTable - # For each unique deposit( address, expirydate, blocknumber) there will be 2 entries added to the table - # the consumption of the deposits will start form the top of the table - deposit_counter = 0 - remaining_amount = swapAmount - for a_deposit in available_deposits: - if a_deposit.depositBalance > remaining_amount: - # accepting token transfer from the contract to depositor's address - returnval = transferToken(contractStructure['accepting_token'], perform_decimal_operation('multiply', remaining_amount, swapPrice), contractStructure['contractAddress'], a_deposit.depositorAddress, transaction_data=transaction_data, parsed_data=parsed_data, isInfiniteToken=None, blockinfo=blockinfo, transactionType='tokenswapDepositSettlement') - if returnval == 0: - logger.info("CRITICAL ERROR | Something went wrong in the token transfer method while doing local Smart Contract Particiaption deposit swap operation") + if transaction_data['txid'] in list(blockno_txhash_T[1]): + logger.warning(f"Transaction {transaction_data['txid']} already exists in the token db. This is unusual, please check your code") + pushData_SSEapi(f"Error | Transaction {transaction_data['txid']} already exists in the token db. This is unusual, please check your code") return 0 + + returnval = transferToken(parsed_data['tokenIdentification'], parsed_data['tokenAmount'], inputlist[0],outputlist[0], transaction_data, parsed_data, isInfiniteToken=isInfiniteToken, blockinfo = blockinfo) + if returnval == 0: + logger.info("Something went wrong in the token transfer method") + pushData_SSEapi(f"Error | Something went wrong while doing the internal db transactions for {transaction_data['txid']}") + return 0 + else: + updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['tokenIdentification']}", transactionType='token-transfer') # If this is the first interaction of the outputlist's address with the given token name, add it to token mapping - systemdb_connection = create_database_connection('system_dbs', {'db_name':'system'}) - firstInteractionCheck = systemdb_connection.execute(f"SELECT * FROM tokenAddressMapping WHERE tokenAddress='{a_deposit.depositorAddress}' AND token='{contractStructure['accepting_token']}'").fetchall() + connection = create_database_connection('system_dbs', {'db_name':'system'}) + firstInteractionCheck = connection.execute(f"SELECT * FROM tokenAddressMapping WHERE tokenAddress='{outputlist[0]}' AND token='{parsed_data['tokenIdentification']}'").fetchall() + if len(firstInteractionCheck) == 0: - systemdb_connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{a_deposit.depositorAddress}', '{contractStructure['accepting_token']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}')") - systemdb_connection.close() + connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{outputlist[0]}', '{parsed_data['tokenIdentification']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}')") + connection.close() - contract_session.add(ContractDeposits( depositorAddress= a_deposit.depositorAddress, - depositAmount= perform_decimal_operation('subtraction', 0, remaining_amount), - status='deposit-honor', - transactionHash= a_deposit.transactionHash, - blockNumber= blockinfo['height'], - blockHash= blockinfo['hash'])) - - # if the total is consumsed then the following entry won't take place - contract_session.add(ContractDeposits( depositorAddress= a_deposit.depositorAddress, - depositBalance= perform_decimal_operation('subtraction', a_deposit.depositBalance, remaining_amount), - expiryTime = a_deposit.expiryTime, - unix_expiryTime = a_deposit.unix_expiryTime, - status='active', - transactionHash= a_deposit.transactionHash, - blockNumber= blockinfo['height'], - blockHash= blockinfo['hash'])) - # ConsumedInfoTable - contract_session.add(ConsumedInfo( id_deposittable= a_deposit.id, - transactionHash= a_deposit.transactionHash, - blockNumber= blockinfo['height'])) - remaining_amount = perform_decimal_operation('subtraction', remaining_amount, a_deposit.depositBalance) - remaining_amount = 0 - break - - - elif a_deposit.depositBalance <= remaining_amount: - # accepting token transfer from the contract to depositor's address - logger.info(f"Performing 'tokenswapSettlement' transaction ID {transaction_data['txid']} from participant {a_deposit.depositorAddress} for {perform_decimal_operation('multiplication', a_deposit.depositBalance, swapPrice)} {contractStructure['accepting_token']}# ") - returnval = transferToken(contractStructure['accepting_token'], perform_decimal_operation('multiplication', a_deposit.depositBalance, swapPrice), contractStructure['contractAddress'], a_deposit.depositorAddress, transaction_data=transaction_data, parsed_data=parsed_data, isInfiniteToken=None, blockinfo=blockinfo, transactionType='tokenswapDepositSettlement') - if returnval == 0: - logger.info("CRITICAL ERROR | Something went wrong in the token transfer method while doing local Smart Contract Particiaption deposit swap operation") - return 0 - - # If this is the first interaction of the outputlist's address with the given token name, add it to token mapping - systemdb_connection = create_database_connection('system_dbs', {'db_name':'system'}) - firstInteractionCheck = systemdb_connection.execute(f"SELECT * FROM tokenAddressMapping WHERE tokenAddress='{a_deposit.depositorAddress}' AND token='{contractStructure['accepting_token']}'").fetchall() - if len(firstInteractionCheck) == 0: - systemdb_connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{a_deposit.depositorAddress}', '{contractStructure['accepting_token']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}')") - systemdb_connection.close() - - - contract_session.add(ContractDeposits( depositorAddress= a_deposit.depositorAddress, - depositAmount= perform_decimal_operation('subtraction', 0, a_deposit.depositBalance), - status='deposit-honor', - transactionHash= a_deposit.transactionHash, - blockNumber= blockinfo['height'], - blockHash= blockinfo['hash'])) - - contract_session.add(ContractDeposits( depositorAddress= a_deposit.depositorAddress, - depositBalance= 0, - expiryTime = a_deposit.expiryTime, - unix_expiryTime = a_deposit.unix_expiryTime, - status='consumed', - transactionHash= a_deposit.transactionHash, - blockNumber= blockinfo['height'], - blockHash= blockinfo['hash'])) - # ConsumedInfoTable - contract_session.add(ConsumedInfo( id_deposittable= a_deposit.id, - transactionHash= a_deposit.transactionHash, - blockNumber= blockinfo['height'])) - remaining_amount = perform_decimal_operation('subtraction', remaining_amount, a_deposit.depositBalance) - - systemdb_session = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase) - systemdb_entry = systemdb_session.query(TimeActions.activity, TimeActions.contractType, TimeActions.tokens_db, TimeActions.parsed_data).filter(TimeActions.transactionHash == a_deposit.transactionHash).first() - systemdb_session.add(TimeActions( - time = a_deposit.expiryTime, - activity = systemdb_entry[0], - status = 'consumed', - contractName = parsed_data['contractName'], - contractAddress = outputlist[0], - contractType = systemdb_entry[1], - tokens_db = systemdb_entry[2], - parsed_data = systemdb_entry[3], - transactionHash = a_deposit.transactionHash, - blockNumber = blockinfo['height'] - )) - systemdb_session.commit() - del systemdb_session - - # token transfer from the contract to participant's address - logger.info(f"Performing 'tokenswapParticipationSettlement' transaction ID {transaction_data['txid']} from participant {outputlist[0]} for {swapAmount} {contractStructure['selling_token']}# ") - returnval = transferToken(contractStructure['selling_token'], swapAmount, outputlist[0], inputlist[0], transaction_data=transaction_data, parsed_data=parsed_data, isInfiniteToken=None, blockinfo=blockinfo, transactionType='tokenswapParticipationSettlement') - if returnval == 0: - logger.info("CRITICAL ERROR | Something went wrong in the token transfer method while doing local Smart Contract Particiaption") - return 0 - - # ContractParticipationTable - contract_session.add(ContractParticipants(participantAddress = transaction_data['senderAddress'], tokenAmount= parsed_data['tokenAmount'], userChoice= swapPrice, transactionHash= transaction_data['txid'], blockNumber= blockinfo['height'], blockHash= blockinfo['hash'], winningAmount = swapAmount)) - - add_contract_transaction_history(contract_name=parsed_data['contractName'], contract_address=outputlist[0], transactionType='participation', transactionSubType='swap', sourceFloAddress=inputlist[0], destFloAddress=outputlist[0], transferAmount=swapAmount, blockNumber=blockinfo['height'], blockHash=blockinfo['hash'], blocktime=blockinfo['time'], transactionHash=transaction_data['txid'], jsonData=json.dumps(transaction_data), parsedFloData=json.dumps(parsed_data)) - - contract_session.commit() - contract_session.close() - - # If this is the first interaction of the participant's address with the given token name, add it to token mapping - systemdb_connection = create_database_connection('system_dbs', {'db_name':'system'}) - firstInteractionCheck = systemdb_connection.execute(f"SELECT * FROM tokenAddressMapping WHERE tokenAddress='{inputlist[0]}' AND token='{contractStructure['selling_token']}'").fetchall() - if len(firstInteractionCheck) == 0: - systemdb_connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{inputlist[0]}', '{contractStructure['selling_token']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}')") - systemdb_connection.close() - - updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['contractName']}-{outputlist[0]}", transactionType='tokenswapParticipation') - pushData_SSEapi(f"Token swap successfully performed at contract {parsed_data['contractName']}-{outputlist[0]} with the transaction {transaction_data['txid']}") - - else: - # Reject the participation saying not enough deposit tokens are available - rejectComment = f"Swap participation at transaction {transaction_data['txid']} rejected as requested swap amount is {swapAmount} but {available_deposit_sum} is available" - logger.info(rejectComment) - rejected_transaction_history(transaction_data, parsed_data, inputlist[0], outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - - return 1 # Indicate successful processing of the continuous event transfer - - - - -def process_smart_contract_transfer(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd): - # Check if the smart contract exists - if check_database_existence('smart_contract', {'contract_name': f"{parsed_data['contractName']}", 'contract_address': f"{outputlist[0]}"}): - connection = create_database_connection('smart_contract', {'contract_name':f"{parsed_data['contractName']}", 'contract_address':f"{outputlist[0]}"}) - contract_session = create_database_session_orm('smart_contract', {'contract_name':f"{parsed_data['contractName']}", 'contract_address':f"{outputlist[0]}"}, ContractBase) - - contractStructure = extract_contractStructure(parsed_data['contractName'], outputlist[0]) - - # Fetch the contract type - contract_type = contract_session.query(ContractStructure.value).filter(ContractStructure.attribute == 'contractType').first()[0] - - # Process based on contract type - if contract_type == 'one-time-event': - return process_one_time_event_transfer(parsed_data, transaction_data, blockinfo, inputlist, outputlist, inputadd, connection, contract_session, contractStructure) - - elif contract_type == 'continuous-event': - return process_continuous_event_transfer(parsed_data, transaction_data, blockinfo, inputlist, outputlist, inputadd, connection, contract_session, contractStructure) - - else: - rejectComment = f"Smart contract transfer at transaction {transaction_data['txid']} rejected due to unknown contract type" - logger.info(rejectComment) - rejected_transaction_history(transaction_data, parsed_data, inputlist[0], outputlist[0], rejectComment) - return 0 - - else: - rejectComment = f"Smart contract transfer at transaction {transaction_data['txid']} rejected as the smart contract does not exist" - logger.info(rejectComment) - rejected_transaction_history(transaction_data, parsed_data, inputlist[0], outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - - - - -def process_nft_transfer(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd): - if not is_a_contract_address(inputlist[0]) and not is_a_contract_address(outputlist[0]): - # check if the token exists in the database - if check_database_existence('token', {'token_name':f"{parsed_data['tokenIdentification']}"}): - # Pull details of the token type from system.db database - connection = create_database_connection('system_dbs', {'db_name':'system'}) - db_details = connection.execute("SELECT db_name, db_type, keyword, object_format FROM databaseTypeMapping WHERE db_name='{}'".format(parsed_data['tokenIdentification'])) - db_details = list(zip(*db_details)) - if db_details[1][0] == 'infinite-token': - db_object = json.loads(db_details[3][0]) - if db_object['root_address'] == inputlist[0]: - isInfiniteToken = True + # Pass information to SSE channel + headers = {'Accept': 'application/json', 'Content-Type': 'application/json'} + # r = requests.post(tokenapi_sse_url, json={f"message': 'Token Transfer | name:{parsed_data['tokenIdentification']} | transactionHash:{transaction_data['txid']}"}, headers=headers) + return 1 else: - isInfiniteToken = False + rejectComment = f"Token transfer at transaction {transaction_data['txid']} rejected as a token with the name {parsed_data['tokenIdentification']} doesnt not exist" + logger.info(rejectComment) + rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 + else: - isInfiniteToken = False - - # Check if the transaction hash already exists in the token db - connection = create_database_connection('token', {'token_name':f"{parsed_data['tokenIdentification']}"}) - blockno_txhash = connection.execute('SELECT blockNumber, transactionHash FROM transactionHistory').fetchall() - connection.close() - blockno_txhash_T = list(zip(*blockno_txhash)) - - if transaction_data['txid'] in list(blockno_txhash_T[1]): - logger.warning(f"Transaction {transaction_data['txid']} already exists in the token db. This is unusual, please check your code") - pushData_SSEapi(f"Error | Transaction {transaction_data['txid']} already exists in the token db. This is unusual, please check your code") - return 0 - - returnval = transferToken(parsed_data['tokenIdentification'], parsed_data['tokenAmount'], inputlist[0],outputlist[0], transaction_data, parsed_data, isInfiniteToken=isInfiniteToken, blockinfo = blockinfo) - if returnval == 0: - logger.info("Something went wrong in the token transfer method") - pushData_SSEapi(f"Error | Something went wrong while doing the internal db transactions for {transaction_data['txid']}") + rejectComment = f"Token transfer at transaction {transaction_data['txid']} rejected as either the input address or the output address is part of a contract address" + logger.info(rejectComment) + rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) return 0 + + # todo Rule 47 - If the parsed data type is token incorporation, then check if the name hasn't been taken already + # if it has been taken then reject the incorporation. Else incorporate it + elif parsed_data['type'] == 'tokenIncorporation': + if not is_a_contract_address(inputlist[0]): + if not check_database_existence('token', {'token_name':f"{parsed_data['tokenIdentification']}"}): + session = create_database_session_orm('token', {'token_name': f"{parsed_data['tokenIdentification']}"}, TokenBase) + session.add(ActiveTable(address=inputlist[0], parentid=0, transferBalance=parsed_data['tokenAmount'], addressBalance=parsed_data['tokenAmount'], blockNumber=blockinfo['height'])) + session.add(TransferLogs(sourceFloAddress=inputadd, destFloAddress=outputlist[0], + transferAmount=parsed_data['tokenAmount'], sourceId=0, destinationId=1, + blockNumber=transaction_data['blockheight'], time=transaction_data['time'], + transactionHash=transaction_data['txid'])) + + add_transaction_history(token_name=parsed_data['tokenIdentification'], sourceFloAddress=inputadd, destFloAddress=outputlist[0], transferAmount=parsed_data['tokenAmount'], blockNumber=transaction_data['blockheight'], blockHash=transaction_data['blockhash'], blocktime=transaction_data['time'], transactionHash=transaction_data['txid'], jsonData=json.dumps(transaction_data), transactionType=parsed_data['type'], parsedFloData=json.dumps(parsed_data)) + + session.commit() + session.close() + + # add it to token address to token mapping db table + connection = create_database_connection('system_dbs', {'db_name':'system'}) + connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{inputadd}', '{parsed_data['tokenIdentification']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}');") + connection.execute(f"INSERT INTO databaseTypeMapping (db_name, db_type, keyword, object_format, blockNumber) VALUES ('{parsed_data['tokenIdentification']}', 'token', '', '', '{transaction_data['blockheight']}')") + connection.close() + + updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['tokenIdentification']}") + pushData_SSEapi(f"Token | Successfully incorporated token {parsed_data['tokenIdentification']} at transaction {transaction_data['txid']}") + return 1 else: - updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['tokenIdentification']}", transactionType='token-transfer') - - # If this is the first interaction of the outputlist's address with the given token name, add it to token mapping - connection = create_database_connection('system_dbs', {'db_name':'system'}) - firstInteractionCheck = connection.execute(f"SELECT * FROM tokenAddressMapping WHERE tokenAddress='{outputlist[0]}' AND token='{parsed_data['tokenIdentification']}'").fetchall() - - if len(firstInteractionCheck) == 0: - connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{outputlist[0]}', '{parsed_data['tokenIdentification']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}')") - - connection.close() - - # Pass information to SSE channel - headers = {'Accept': 'application/json', 'Content-Type': 'application/json'} - # r = requests.post(tokenapi_sse_url, json={f"message': 'Token Transfer | name:{parsed_data['tokenIdentification']} | transactionHash:{transaction_data['txid']}"}, headers=headers) - return 1 + rejectComment = f"Token incorporation rejected at transaction {transaction_data['txid']} as token {parsed_data['tokenIdentification']} already exists" + logger.info(rejectComment) + rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 else: - rejectComment = f"Token transfer at transaction {transaction_data['txid']} rejected as a token with the name {parsed_data['tokenIdentification']} doesnt not exist" - logger.info(rejectComment) - rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - - else: - rejectComment = f"Token transfer at transaction {transaction_data['txid']} rejected as either the input address or the output address is part of a contract address" - logger.info(rejectComment) - rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - - -# todo Rule 47 - If the parsed data type is token incorporation, then check if the name hasn't been taken already -# if it has been taken then reject the incorporation. Else incorporate it -def process_token_incorporation(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd): - logger.info("Processing token incorporation...") - - if not is_a_contract_address(inputlist[0]): - if not check_database_existence('token', {'token_name':f"{parsed_data['tokenIdentification']}"}): - session = create_database_session_orm('token', {'token_name': f"{parsed_data['tokenIdentification']}"}, TokenBase) - session.add(ActiveTable(address=inputlist[0], parentid=0, transferBalance=parsed_data['tokenAmount'], addressBalance=parsed_data['tokenAmount'], blockNumber=blockinfo['height'])) - session.add(TransferLogs(sourceFloAddress=inputadd, destFloAddress=outputlist[0], - transferAmount=parsed_data['tokenAmount'], sourceId=0, destinationId=1, - blockNumber=transaction_data['blockheight'], time=transaction_data['time'], - transactionHash=transaction_data['txid'])) - - add_transaction_history(token_name=parsed_data['tokenIdentification'], sourceFloAddress=inputadd, destFloAddress=outputlist[0], transferAmount=parsed_data['tokenAmount'], blockNumber=transaction_data['blockheight'], blockHash=transaction_data['blockhash'], blocktime=transaction_data['time'], transactionHash=transaction_data['txid'], jsonData=json.dumps(transaction_data), transactionType=parsed_data['type'], parsedFloData=json.dumps(parsed_data)) - - session.commit() - session.close() - - # add it to token address to token mapping db table - connection = create_database_connection('system_dbs', {'db_name':'system'}) - connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{inputadd}', '{parsed_data['tokenIdentification']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}');") - connection.execute(f"INSERT INTO databaseTypeMapping (db_name, db_type, keyword, object_format, blockNumber) VALUES ('{parsed_data['tokenIdentification']}', 'token', '', '', '{transaction_data['blockheight']}')") - connection.close() - - updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['tokenIdentification']}") - logger.info(f"Token | Successfully incorporated token {parsed_data['tokenIdentification']} at transaction {transaction_data['txid']}") - pushData_SSEapi(f"Token | Successfully incorporated token {parsed_data['tokenIdentification']} at transaction {transaction_data['txid']}") - return 1 - else: - rejectComment = f"Token incorporation rejected at transaction {transaction_data['txid']} as token {parsed_data['tokenIdentification']} already exists" + rejectComment = f"Token incorporation at transaction {transaction_data['txid']} rejected as either the input address is part of a contract address" logger.info(rejectComment) rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) pushData_SSEapi(rejectComment) return 0 - else: - rejectComment = f"Token incorporation at transaction {transaction_data['txid']} rejected as either the input address is part of a contract address" - logger.info(rejectComment) - rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - -# Rule 48 - If the parsed data type if smart contract incorporation, then check if the name hasn't been taken already -# if it has been taken then reject the incorporation. -def process_smart_contract_incorporation(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd): - logger.info(f"Processing smart contract incorporation for transaction {transaction_data['txid']}") + # todo Rule 48 - If the parsed data type if smart contract incorporation, then check if the name hasn't been taken already + # if it has been taken then reject the incorporation. + elif parsed_data['type'] == 'smartContractIncorporation': if not check_database_existence('smart_contract', {'contract_name':f"{parsed_data['contractName']}", 'contract_address':f"{parsed_data['contractAddress']}"}): # Cannot incorporate on an address with any previous token transaction systemdb_session = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase) @@ -2462,7 +2079,7 @@ def process_smart_contract_incorporation(parsed_data, transaction_data, blockinf delete_contract_database({'contract_name': parsed_data['contractName'], 'contract_address': parsed_data['contractAddress']}) return 0 -def process_smart_contract_pays(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd): + elif parsed_data['type'] == 'smartContractPays': logger.info(f"Transaction {transaction_data['txid']} is of the type smartContractPays") committeeAddressList = refresh_committee_list(APP_ADMIN, neturl, blockinfo['time']) # Check if input address is a committee address @@ -2661,8 +2278,7 @@ def process_smart_contract_pays(parsed_data, transaction_data, blockinfo,inputli pushData_SSEapi(rejectComment) return 0 - -def process_smart_contract_deposit(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd): + elif parsed_data['type'] == 'smartContractDeposit': if check_database_existence('smart_contract', {'contract_name':f"{parsed_data['contractName']}", 'contract_address':f"{outputlist[0]}"}): # Reject if the deposit expiry time is greater than incorporated blocktime expiry_time = convert_datetime_to_arrowobject(parsed_data['depositConditions']['expiryTime']) @@ -2678,7 +2294,7 @@ def process_smart_contract_deposit(parsed_data, transaction_data, blockinfo,inpu participantAdd_txhash_T = list(zip(*participantAdd_txhash)) if len(participantAdd_txhash) != 0 and transaction_data['txid'] in list(participantAdd_txhash_T[1]): - rejectComment = f"Contract deposit at transaction {transaction_data['txid']} rejected as it already exists in the Smart Contract db. This is unusual, please check your code" + rejectComment = f"Transaction {transaction_data['txid']} rejected as it already exists in the Smart Contract db. This is unusual, please check your code" logger.warning(rejectComment) rejected_contract_transaction_history(transaction_data, parsed_data, 'deposit', outputlist[0], inputadd, outputlist[0], rejectComment) return 0 @@ -2697,7 +2313,6 @@ def process_smart_contract_deposit(parsed_data, transaction_data, blockinfo,inpu contractStructure = extract_contractStructure(parsed_data['contractName'], outputlist[0]) # Transfer the token - logger.info(f"Initiating transfers for smartcontract deposit with transaction ID {transaction_data['txid']}") returnval = transferToken(parsed_data['tokenIdentification'], parsed_data['depositAmount'], inputlist[0], outputlist[0], transaction_data, parsed_data, blockinfo=blockinfo) if returnval == 0: logger.info("Something went wrong in the token transfer method") @@ -2749,202 +2364,91 @@ def process_smart_contract_deposit(parsed_data, transaction_data, blockinfo,inpu return 1 else: - rejectComment = f"Deposit Transaction {transaction_data['txid']} rejected as a Smart Contract with the name {parsed_data['contractName']} at address {outputlist[0]} doesnt exist" + rejectComment = f"Transaction {transaction_data['txid']} rejected as a Smart Contract with the name {parsed_data['contractName']} at address {outputlist[0]} doesnt exist" logger.info(rejectComment) rejected_contract_transaction_history(transaction_data, parsed_data, 'smartContractDeposit', outputlist[0], inputadd, outputlist[0], rejectComment) return 0 - -def process_nft_incorporation(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd): - ''' + + elif parsed_data['type'] == 'nftIncorporation': + ''' DIFFERENT BETWEEN TOKEN AND NFT System.db will have a different entry in creation nft word will be extra NFT Hash must be present Creation and transfer amount .. only integer parts will be taken Keyword nft must be present in both creation and transfer - ''' + ''' + if not is_a_contract_address(inputlist[0]): + if not check_database_existence('token', {'token_name':f"{parsed_data['tokenIdentification']}"}): + session = create_database_session_orm('token', {'token_name': f"{parsed_data['tokenIdentification']}"}, TokenBase) + session.add(ActiveTable(address=inputlist[0], parentid=0, transferBalance=parsed_data['tokenAmount'], addressBalance=parsed_data['tokenAmount'], blockNumber=blockinfo['height'])) + session.add(TransferLogs(sourceFloAddress=inputadd, destFloAddress=outputlist[0], transferAmount=parsed_data['tokenAmount'], sourceId=0, destinationId=1, blockNumber=transaction_data['blockheight'], time=transaction_data['time'], transactionHash=transaction_data['txid'])) + add_transaction_history(token_name=parsed_data['tokenIdentification'], sourceFloAddress=inputadd, destFloAddress=outputlist[0], transferAmount=parsed_data['tokenAmount'], blockNumber=transaction_data['blockheight'], blockHash=transaction_data['blockhash'], blocktime=transaction_data['time'], transactionHash=transaction_data['txid'], jsonData=json.dumps(transaction_data), transactionType=parsed_data['type'], parsedFloData=json.dumps(parsed_data)) + + session.commit() + session.close() - if not is_a_contract_address(inputlist[0]): - if not check_database_existence('token', {'token_name': f"{parsed_data['tokenIdentification']}"}): - session = create_database_session_orm('token', {'token_name': f"{parsed_data['tokenIdentification']}"}, TokenBase) - session.add(ActiveTable(address=inputlist[0], parentid=0, transferBalance=parsed_data['tokenAmount'], addressBalance=parsed_data['tokenAmount'], blockNumber=blockinfo['height'])) - session.add(TransferLogs(sourceFloAddress=inputadd, destFloAddress=outputlist[0], transferAmount=parsed_data['tokenAmount'], sourceId=0, destinationId=1, blockNumber=transaction_data['blockheight'], time=transaction_data['time'], transactionHash=transaction_data['txid'])) - add_transaction_history(token_name=parsed_data['tokenIdentification'], sourceFloAddress=inputadd, destFloAddress=outputlist[0], transferAmount=parsed_data['tokenAmount'], blockNumber=transaction_data['blockheight'], blockHash=transaction_data['blockhash'], blocktime=transaction_data['time'], transactionHash=transaction_data['txid'], jsonData=json.dumps(transaction_data), transactionType=parsed_data['type'], parsedFloData=json.dumps(parsed_data)) - - session.commit() - session.close() - - # Add it to token address to token mapping db table - connection = create_database_connection('system_dbs', {'db_name': 'system'}) - connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{inputadd}', '{parsed_data['tokenIdentification']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}');") - nft_data = {'sha256_hash': f"{parsed_data['nftHash']}"} - connection.execute(f"INSERT INTO databaseTypeMapping (db_name, db_type, keyword, object_format, blockNumber) VALUES ('{parsed_data['tokenIdentification']}', 'nft', '', '{json.dumps(nft_data)}', '{transaction_data['blockheight']}')") - connection.close() - - updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['tokenIdentification']}") - pushData_SSEapi(f"NFT | Successfully incorporated NFT {parsed_data['tokenIdentification']} at transaction {transaction_data['txid']}") - return 1 - else: - rejectComment = f"Transaction {transaction_data['txid']} rejected as an NFT with the name {parsed_data['tokenIdentification']} has already been incorporated" - logger.info(rejectComment) - rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - else: - rejectComment = f"NFT incorporation at transaction {transaction_data['txid']} rejected as either the input address is part of a contract address" - logger.info(rejectComment) - rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - -def process_infinite_token_incorporation(parsed_data, transaction_data, blockinfo, inputlist, outputlist, inputadd): - logger.info(f"Processing infinite token incorporation for transaction {transaction_data['txid']}") - - # Ensure that neither the input nor output addresses are contract addresses - if not is_a_contract_address(inputlist[0]) and not is_a_contract_address(outputlist[0]): - # Check if the token already exists in the database - if not check_database_existence('token', {'token_name': f"{parsed_data['tokenIdentification']}"}): - parsed_data['tokenAmount'] = 0 - - # Create a session to manage token incorporation - try: - tokendb_session = create_database_session_orm('token', {'token_name': f"{parsed_data['tokenIdentification']}"}, TokenBase) - - # Add initial token data to the database - tokendb_session.add( - ActiveTable( - address=inputlist[0], - parentid=0, - transferBalance=parsed_data['tokenAmount'], - blockNumber=blockinfo['height'] - ) - ) - tokendb_session.add( - TransferLogs( - sourceFloAddress=inputadd, - destFloAddress=outputlist[0], - transferAmount=parsed_data['tokenAmount'], - sourceId=0, - destinationId=1, - blockNumber=transaction_data['blockheight'], - time=transaction_data['time'], - transactionHash=transaction_data['txid'] - ) - ) - - # Add the transaction history for the token - add_transaction_history( - token_name=parsed_data['tokenIdentification'], - sourceFloAddress=inputadd, - destFloAddress=outputlist[0], - transferAmount=parsed_data['tokenAmount'], - blockNumber=transaction_data['blockheight'], - blockHash=transaction_data['blockhash'], - blocktime=blockinfo['time'], - transactionHash=transaction_data['txid'], - jsonData=json.dumps(transaction_data), - transactionType=parsed_data['type'], - parsedFloData=json.dumps(parsed_data) - ) - - # Add to token address mapping - connection = create_database_connection('system_dbs', {'db_name': 'system'}) - connection.execute("INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES (%s, %s, %s, %s, %s)", (inputadd, parsed_data['tokenIdentification'], transaction_data['txid'], transaction_data['blockheight'], transaction_data['blockhash'])) - - - # Add to database type mapping - info_object = {'root_address': inputadd} - connection.execute("INSERT INTO databaseTypeMapping (db_name, db_type, keyword, object_format, blockNumber) VALUES (%s, %s, %s, %s, %s)", (parsed_data['tokenIdentification'], 'infinite-token', '', json.dumps(info_object), transaction_data['blockheight'])) - - - # Commit the session and close connections - updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['tokenIdentification']}") - tokendb_session.commit() - logger.info(f"Token | Successfully incorporated token {parsed_data['tokenIdentification']} at transaction {transaction_data['txid']}") - - except Exception as e: - logger.error(f"Error during infinite token incorporation: {e}") - tokendb_session.rollback() - return 0 - finally: + # add it to token address to token mapping db table + connection = create_database_connection('system_dbs', {'db_name':'system'}) + connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{inputadd}', '{parsed_data['tokenIdentification']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}');") + nft_data = {'sha256_hash': f"{parsed_data['nftHash']}"} + connection.execute(f"INSERT INTO databaseTypeMapping (db_name, db_type, keyword, object_format, blockNumber) VALUES ('{parsed_data['tokenIdentification']}', 'nft', '', '{json.dumps(nft_data)}', '{transaction_data['blockheight']}')") connection.close() - tokendb_session.close() - pushData_SSEapi(f"Token | Successfully incorporated token {parsed_data['tokenIdentification']} at transaction {transaction_data['txid']}") - return 1 + updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['tokenIdentification']}") + pushData_SSEapi(f"NFT | Succesfully incorporated NFT {parsed_data['tokenIdentification']} at transaction {transaction_data['txid']}") + return 1 + else: + rejectComment = f"Transaction {transaction_data['txid']} rejected as an NFT with the name {parsed_data['tokenIdentification']} has already been incorporated" + logger.info(rejectComment) + rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 else: - rejectComment = f"Transaction {transaction_data['txid']} rejected as a token with the name {parsed_data['tokenIdentification']} has already been incorporated" + rejectComment = f"NFT incorporation at transaction {transaction_data['txid']} rejected as either the input address is part of a contract address" logger.info(rejectComment) rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) pushData_SSEapi(rejectComment) return 0 - else: - rejectComment = f"Infinite token incorporation at transaction {transaction_data['txid']} rejected as either the input address or output address is part of a contract address" - logger.info(rejectComment) - rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) - pushData_SSEapi(rejectComment) - return 0 - - - -# Main processing functions START - -def processTransaction(transaction_data, parsed_data, blockinfo): - - inputlist, outputlist, inputadd = process_flo_checks(transaction_data) - - if inputlist is None or outputlist is None: - return 0 - - logger.info(f"Input address list : {inputlist}") - logger.info(f"Output address list : {outputlist}") - - transaction_data['senderAddress'] = inputlist[0] - transaction_data['receiverAddress'] = outputlist[0] - - # Process transaction based on type - if parsed_data['type'] == 'transfer': - logger.info(f"Transaction {transaction_data['txid']} is of the type transfer") - - if parsed_data['transferType'] == 'token': - return process_token_transfer(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd) - elif parsed_data['transferType'] == 'smartContract': - return process_smart_contract_transfer(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd) - elif parsed_data['transferType'] == 'nft': - return process_nft_transfer(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd) - else: - logger.info(f"Invalid transfer type in transaction {transaction_data['txid']}") - return 0 - - elif parsed_data['type'] == 'tokenIncorporation': - logger.info(f"Transaction {transaction_data['txid']} is of the type tokenIncorporation") - return process_token_incorporation(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd) - - elif parsed_data['type'] == 'smartContractIncorporation': - logger.info(f"Transaction {transaction_data['txid']} is of the type smartContractIncorporation") - return process_smart_contract_incorporation(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd) - - elif parsed_data['type'] == 'smartContractPays': - logger.info(f"Transaction {transaction_data['txid']} is of the type smartContractPays") - return process_smart_contract_pays(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd) - - elif parsed_data['type'] == 'smartContractDeposit': - logger.info(f"Transaction {transaction_data['txid']} is of the type smartContractDeposit") - return process_smart_contract_deposit(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd) - - elif parsed_data['type'] == 'nftIncorporation': - logger.info(f"Transaction {transaction_data['txid']} is of the type nftIncorporation") - return process_nft_incorporation(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd) elif parsed_data['type'] == 'infiniteTokenIncorporation': - logger.info(f"Transaction {transaction_data['txid']} is of the type infiniteTokenIncorporation") - return process_infinite_token_incorporation(parsed_data, transaction_data, blockinfo,inputlist, outputlist, inputadd) + if not is_a_contract_address(inputlist[0]) and not is_a_contract_address(outputlist[0]): + if not check_database_existence('token', {'token_name':f"{parsed_data['tokenIdentification']}"}): + parsed_data['tokenAmount'] = 0 + tokendb_session = create_database_session_orm('token', {'token_name': f"{parsed_data['tokenIdentification']}"}, TokenBase) + tokendb_session.add(ActiveTable(address=inputlist[0], parentid=0, transferBalance=parsed_data['tokenAmount'], blockNumber=blockinfo['height'])) + tokendb_session.add(TransferLogs(sourceFloAddress=inputadd, destFloAddress=outputlist[0], + transferAmount=parsed_data['tokenAmount'], sourceId=0, destinationId=1, + blockNumber=transaction_data['blockheight'], time=transaction_data['time'], + transactionHash=transaction_data['txid'])) + + add_transaction_history(token_name=parsed_data['tokenIdentification'], sourceFloAddress=inputadd, destFloAddress=outputlist[0], transferAmount=parsed_data['tokenAmount'], blockNumber=transaction_data['blockheight'], blockHash=transaction_data['blockhash'], blocktime=blockinfo['time'], transactionHash=transaction_data['txid'], jsonData=json.dumps(transaction_data), transactionType=parsed_data['type'], parsedFloData=json.dumps(parsed_data)) - else: - logger.info(f"Transaction {transaction_data['txid']} rejected as it doesn't belong to any valid type") - return 0 + + # add it to token address to token mapping db table + connection = create_database_connection('system_dbs', {'db_name':'system'}) + connection.execute(f"INSERT INTO tokenAddressMapping (tokenAddress, token, transactionHash, blockNumber, blockHash) VALUES ('{inputadd}', '{parsed_data['tokenIdentification']}', '{transaction_data['txid']}', '{transaction_data['blockheight']}', '{transaction_data['blockhash']}');") + info_object = {'root_address': inputadd} + connection.execute("""INSERT INTO databaseTypeMapping (db_name, db_type, keyword, object_format, blockNumber) VALUES (?, ?, ?, ?, ?)""", (parsed_data['tokenIdentification'], 'infinite-token', '', json.dumps(info_object), transaction_data['blockheight'])) + updateLatestTransaction(transaction_data, parsed_data, f"{parsed_data['tokenIdentification']}") + tokendb_session.commit() + connection.close() + tokendb_session.close() + pushData_SSEapi(f"Token | Succesfully incorporated token {parsed_data['tokenIdentification']} at transaction {transaction_data['txid']}") + return 1 + else: + rejectComment = f"Transaction {transaction_data['txid']} rejected as a token with the name {parsed_data['tokenIdentification']} has already been incorporated" + logger.info(rejectComment) + rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 + else: + rejectComment = f"Infinite token incorporation at transaction {transaction_data['txid']} rejected as either the input address is part of a contract address" + logger.info(rejectComment) + rejected_transaction_history(transaction_data, parsed_data, inputadd, outputlist[0], rejectComment) + pushData_SSEapi(rejectComment) + return 0 - return 1 def scanBlockchain(): # Read start block no @@ -2983,7 +2487,7 @@ def scanBlockchain(): processBlock(blockindex=blockindex) # At this point the script has updated to the latest block - # Now we connect to Blockbook's websocket API to get information about the latest blocks + # Now we connect to flosight's websocket API to get information about the latest blocks def switchNeturl(currentneturl): # Use modulo operation to simplify the logic @@ -2992,9 +2496,9 @@ def switchNeturl(currentneturl): def reconnectWebsocket(socket_variable): - # Switch a to different Blockbook + # Switch a to different flosight # neturl = switchNeturl(neturl) - # Connect to Blockbook websocket to get data on new incoming blocks + # Connect to Flosight websocket to get data on new incoming blocks i=0 newurl = serverlist[0] while(not socket_variable.connected): @@ -3054,27 +2558,11 @@ async def connect_to_websocket(uri): config = configparser.ConfigParser() config.read('config.ini') -class MySQLConfig: - def __init__(self): - self.username = config['MYSQL']['USERNAME'] - self.password = config['MYSQL']['PASSWORD'] - self.host = config['MYSQL']['HOST'] - self.database_prefix = config['MYSQL']['DATABASE_PREFIX'] - -mysql_config = MySQLConfig() - logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) - -# Suppress SQLAlchemy engine logs -logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING) -logging.getLogger('sqlalchemy.pool').setLevel(logging.WARNING) -logging.getLogger('sqlalchemy.dialects').setLevel(logging.WARNING) - formatter = logging.Formatter('%(asctime)s:%(name)s:%(message)s') -DATA_PATH = os.path.dirname(os.path.abspath(__file__)) -file_handler = logging.FileHandler(os.path.join(DATA_PATH, 'tracking.log')) +file_handler = logging.FileHandler(os.path.join(config['DEFAULT']['DATA_PATH'],'tracking.log')) file_handler.setLevel(logging.INFO) file_handler.setFormatter(formatter) @@ -3088,7 +2576,7 @@ logger.addHandler(stream_handler) # Rule 1 - Read command line arguments to reset the databases as blank # Rule 2 - Read config to set testnet/mainnet # Rule 3 - Set flo blockexplorer location depending on testnet or mainnet -# Rule 4 - Set the local flo-cli path depending on testnet or mainnet ( removed this feature | Blockbooks are the only source ) +# Rule 4 - Set the local flo-cli path depending on testnet or mainnet ( removed this feature | Flosights are the only source ) # Rule 5 - Set the block number to scan from @@ -3096,11 +2584,16 @@ logger.addHandler(stream_handler) parser = argparse.ArgumentParser(description='Script tracks RMT using FLO data on the FLO blockchain - https://flo.cash') parser.add_argument('-r', '--reset', nargs='?', const=1, type=int, help='Purge existing db and rebuild it from scratch') parser.add_argument('-rb', '--rebuild', nargs='?', const=1, type=int, help='Rebuild it') -parser.add_argument("--keywords", nargs="+", help="List of keywords to filter transactions during rebuild.", required=False) parser.add_argument("--testnet", action="store_true", help="Use the testnet URL") - args = parser.parse_args() +dirpath = os.path.join(config['DEFAULT']['DATA_PATH'], 'tokens') +if not os.path.isdir(dirpath): + os.mkdir(dirpath) +dirpath = os.path.join(config['DEFAULT']['DATA_PATH'], 'smartContracts') +if not os.path.isdir(dirpath): + os.mkdir(dirpath) + # Read configuration # todo - write all assertions to make sure default configs are right @@ -3112,15 +2605,15 @@ if (config['DEFAULT']['NET'] != 'mainnet') and (config['DEFAULT']['NET'] != 'tes # Specify ADMIN ID serverlist = None if config['DEFAULT']['NET'] == 'mainnet': - serverlist = config['DEFAULT']['MAINNET_BLOCKBOOK_SERVER_LIST'] + serverlist = config['DEFAULT']['MAINNET_FLOSIGHT_SERVER_LIST'] APP_ADMIN = 'FNcvkz9PZNZM3HcxM1XTrVL4tgivmCkHp9' websocket_uri = get_websocket_uri(testnet=False) elif config['DEFAULT']['NET'] == 'testnet': - serverlist = config['DEFAULT']['TESTNET_BLOCKBOOK_SERVER_LIST'] + serverlist = config['DEFAULT']['TESTNET_FLOSIGHT_SERVER_LIST'] APP_ADMIN = 'oWooGLbBELNnwq8Z5YmjoVjw8GhBGH3qSP' websocket_uri = get_websocket_uri(testnet=True) serverlist = serverlist.split(',') -neturl = config['DEFAULT']['BLOCKBOOK_NETURL'] +neturl = config['DEFAULT']['FLOSIGHT_NETURL'] api_url = neturl tokenapi_sse_url = config['DEFAULT']['TOKENAPI_SSE_URL'] API_VERIFY = config['DEFAULT']['API_VERIFY'] @@ -3137,6 +2630,14 @@ IGNORE_BLOCK_LIST = [int(s) for s in IGNORE_BLOCK_LIST] IGNORE_TRANSACTION_LIST = config['DEFAULT']['IGNORE_TRANSACTION_LIST'].split(',') +def create_dir_if_not_exist(dir_path, reset = False): + if os.path.exists(dir_path): + if reset: + shutil.rmtree(dir_path) + os.mkdir(dir_path) + else: + os.mkdir(dir_path) + def init_system_db(startblock): # Initialize system.db session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) @@ -3150,251 +2651,30 @@ def init_lastestcache_db(): session.commit() session.close() -def init_storage_if_not_exist(reset=False, exclude_backups=False): - """ - Initialize or reset the storage by creating or dropping/recreating system and cache databases. - When reset=True, also drops all token and smart contract databases. +def init_storage_if_not_exist(reset = False): + + token_dir_path = os.path.join(config['DEFAULT']['DATA_PATH'], 'tokens') + create_dir_if_not_exist(token_dir_path, reset) - Args: - reset (bool): If True, resets the databases by dropping and recreating them. - exclude_backups (bool): If True, skips dropping databases with '_backup' in their names. - """ - def ensure_database_exists(database_name, init_function=None): - engine = create_engine(f"mysql+pymysql://{mysql_config.username}:{mysql_config.password}@{mysql_config.host}/", echo=False) - with engine.connect() as connection: - # Drop and recreate the database if reset is True - if reset: - if exclude_backups and "_backup" in database_name: - logger.info(f"Skipping reset for backup database '{database_name}'.") - else: - connection.execute(f"DROP DATABASE IF EXISTS `{database_name}`") - logger.info(f"Database '{database_name}' dropped for reset.") - logger.info(f"Rechecking database '{database_name}' exists.") - connection.execute(f"CREATE DATABASE IF NOT EXISTS `{database_name}`") - logger.info(f"Database '{database_name}' ensured to exist.") - # Run initialization function if provided - if init_function: - init_function() - - def drop_token_and_smartcontract_databases(): - """ - Drop all token and smart contract databases when reset is True. - Token databases: Named with prefix {prefix}_{token_name}_db. - Smart contract databases: Named with prefix {prefix}_{contract_name}_{contract_address}_db. - """ - engine = create_engine(f"mysql+pymysql://{mysql_config.username}:{mysql_config.password}@{mysql_config.host}/", echo=False) - with engine.connect() as connection: - logger.info("Dropping all token and smart contract databases as part of reset.") - result = connection.execute("SHOW DATABASES") - databases = [row[0] for row in result.fetchall()] - for db_name in databases: - if db_name.startswith(f"{mysql_config.database_prefix}_") and "_db" in db_name: - if exclude_backups and "_backup" in db_name: - logger.info(f"Skipping backup database '{db_name}'.") - continue - if not db_name.endswith("system_db") and not db_name.endswith("latestCache_db"): - connection.execute(f"DROP DATABASE IF EXISTS `{db_name}`") - logger.info(f"Dropped database '{db_name}'.") - - if reset: - # Drop all token and smart contract databases - drop_token_and_smartcontract_databases() - - # Initialize the system database - system_db_name = f"{mysql_config.database_prefix}_system_db" - ensure_database_exists(system_db_name, lambda: init_system_db(int(config['DEFAULT']['START_BLOCK']))) - - # Initialize the latest cache database - latest_cache_db_name = f"{mysql_config.database_prefix}_latestCache_db" - ensure_database_exists(latest_cache_db_name, init_lastestcache_db) - - -def fetch_current_block_height(): - """ - Fetch the current block height from the blockchain using the `newMultiRequest` function. - - :return: The current block height as an integer. - """ - current_index = -1 - while current_index == -1: # Keep trying until a valid block height is retrieved - try: - response = newMultiRequest('blocks') # Make the API call - current_index = response['backend']['blocks'] # Extract block height from response - logger.info(f"Current block height fetched: {current_index}") - except Exception as e: - logger.error(f"Error fetching current block height: {e}") - logger.info("Program will wait for 1 second and try to reconnect.") - time.sleep(1) # Wait before retrying - - return current_index - - -def backup_database_to_temp(original_db, backup_db): - """ - Back up the original database schema and data into a new temporary backup database. - - :param original_db: Name of the original database. - :param backup_db: Name of the backup database. - :return: True if successful, False otherwise. - """ - try: - # Ensure the backup database exists - engine = create_engine(f"mysql+pymysql://{mysql_config.username}:{mysql_config.password}@{mysql_config.host}/", echo=False) - with engine.connect() as connection: - logger.info(f"Creating backup database '{backup_db}'...") - connection.execute(f"DROP DATABASE IF EXISTS `{backup_db}`") - connection.execute(f"CREATE DATABASE `{backup_db}`") - logger.info(f"Temporary backup database '{backup_db}' created successfully.") + smart_contract_dir_path = os.path.join(config['DEFAULT']['DATA_PATH'], 'smartContracts') + create_dir_if_not_exist(smart_contract_dir_path, reset) + + system_db_path = os.path.join(config['DEFAULT']['DATA_PATH'], 'system.db') + if os.path.exists(system_db_path): + if reset: + os.remove(system_db_path) + init_system_db(int(config['DEFAULT']['START_BLOCK'])) + else: + init_system_db(int(config['DEFAULT']['START_BLOCK'])) - # Verify database creation - result = connection.execute(f"SHOW DATABASES LIKE '{backup_db}'").fetchone() - if not result: - raise RuntimeError(f"Backup database '{backup_db}' was not created successfully.") - except Exception as e: - logger.error(f"Failed to create backup database '{backup_db}': {e}") - return False - - try: - # Reflect original database schema - original_engine = create_engine(f"mysql+pymysql://{mysql_config.username}:{mysql_config.password}@{mysql_config.host}/{original_db}", echo=False) - backup_engine = create_engine(f"mysql+pymysql://{mysql_config.username}:{mysql_config.password}@{mysql_config.host}/{backup_db}", echo=False) - - logger.info(f"Connecting to original database: {original_engine.url}") - logger.info(f"Connecting to backup database: {backup_engine.url}") - - from sqlalchemy.schema import MetaData - metadata = MetaData() - metadata.reflect(bind=original_engine) - metadata.create_all(bind=backup_engine) - - SessionOriginal = sessionmaker(bind=original_engine) - SessionBackup = sessionmaker(bind=backup_engine) - session_original = SessionOriginal() - session_backup = SessionBackup() - - for table in metadata.sorted_tables: - table_name = table.name - logger.info(f"Copying data from table '{table_name}'...") - data = session_original.execute(table.select()).fetchall() - - if data: - column_names = [column.name for column in table.columns] - data_dicts = [dict(zip(column_names, row)) for row in data] - try: - session_backup.execute(table.insert(), data_dicts) - session_backup.commit() - except Exception as e: - logger.error(f"Error copying data from table '{table_name}': {e}") - logger.debug(f"Data causing the issue: {data_dicts}") - raise - - session_original.close() - session_backup.close() - logger.info(f"Data successfully backed up to '{backup_db}'.") - return True - - except Exception as e: - logger.error(f"Error copying data to backup database '{backup_db}': {e}") - return False - - -def backup_and_rebuild_latestcache(keywords=None): - """ - Back up the current databases, reset all databases, and rebuild the latestCache database. - - :param keywords: List of keywords to filter transactions. If None, processes all transactions. - """ - # Define database names - latestcache_db = f"{mysql_config.database_prefix}_latestCache_db" - latestcache_backup_db = f"{latestcache_db}_backup" - system_db = f"{mysql_config.database_prefix}_system_db" - system_backup_db = f"{system_db}_backup" - - # Step 1: Create backups - logger.info("Creating backups of the latestCache and system databases...") - if not backup_database_to_temp(latestcache_db, latestcache_backup_db): - logger.error(f"Failed to create backup for latestCache database '{latestcache_db}'.") - return - if not backup_database_to_temp(system_db, system_backup_db): - logger.error(f"Failed to create backup for system database '{system_db}'.") - return - - # Step 2: Reset databases (skip backup databases during reset) - logger.info("Resetting all databases except backups...") - try: - init_storage_if_not_exist(reset=True, exclude_backups=True) # Pass a flag to avoid dropping backup databases - except Exception as e: - logger.error(f"Failed to reset databases: {e}") - return - - # Step 3: Extract last block scanned from backup - try: - logger.info("Extracting last block scanned from backup system database...") - backup_engine = create_engine(f"mysql+pymysql://{mysql_config.username}:{mysql_config.password}@{mysql_config.host}/{system_backup_db}", echo=False) - with sessionmaker(bind=backup_engine)() as session: - last_block_scanned_entry = session.query(SystemData).filter_by(attribute='lastblockscanned').first() - if not last_block_scanned_entry: - raise ValueError("No 'lastblockscanned' entry found in backup system database.") - last_block_scanned = int(last_block_scanned_entry.value) - logger.info(f"Last block scanned retrieved: {last_block_scanned}") - except Exception as e: - logger.error(f"Failed to retrieve lastblockscanned from backup system database: {e}") - return - - # Step 4: Reprocess blocks from the backup - try: - logger.info("Starting reprocessing of blocks from backup...") - backup_engine = create_engine(f"mysql+pymysql://{mysql_config.username}:{mysql_config.password}@{mysql_config.host}/{latestcache_backup_db}", echo=False) - with sessionmaker(bind=backup_engine)() as session: - stored_blocks = session.query(LatestBlocks).order_by(LatestBlocks.blockNumber).all() - if not stored_blocks: - logger.warning("No blocks found in backed-up latestCache database. Aborting rebuild.") - return - - for block_entry in stored_blocks: - try: - blockinfo = json.loads(block_entry.jsonData) - block_number = blockinfo.get("height", block_entry.blockNumber) - block_hash = blockinfo.get("hash", block_entry.blockHash) - - logger.info(f"Reprocessing block {block_number} with hash {block_hash}...") - processBlock(blockindex=block_number, blockhash=block_hash, blockinfo=blockinfo, keywords=keywords) - except Exception as e: - logger.error(f"Error processing block {block_entry.blockNumber}: {e}") - logger.info("Rebuild of latestCache database completed successfully.") - except Exception as e: - logger.error(f"Error during rebuild of latestCache database from backup: {e}") - return - - # Step 5: Update lastblockscanned in the new system database - try: - logger.info("Updating lastblockscanned in the new system database...") - engine = create_engine(f"mysql+pymysql://{mysql_config.username}:{mysql_config.password}@{mysql_config.host}/{system_db}", echo=False) - with sessionmaker(bind=engine)() as session: - entry = session.query(SystemData).filter_by(attribute='lastblockscanned').first() - if entry: - entry.value = str(last_block_scanned) - else: - session.add(SystemData(attribute='lastblockscanned', value=str(last_block_scanned))) - session.commit() - logger.info(f"Updated lastblockscanned to {last_block_scanned}.") - except Exception as e: - logger.error(f"Failed to update lastblockscanned: {e}") - return - - # Step 6: Process remaining blocks - try: - logger.info("Processing remaining blocks...") - current_block_height = fetch_current_block_height() - for blockindex in range(last_block_scanned + 1, current_block_height + 1): - if blockindex in IGNORE_BLOCK_LIST: - continue - logger.info(f"Processing block {blockindex} from the blockchain...") - processBlock(blockindex=blockindex, keywords=keywords) - except Exception as e: - logger.error(f"Error processing remaining blocks: {e}") - - + + latestCache_db_path = os.path.join(config['DEFAULT']['DATA_PATH'], 'latestCache.db') + if os.path.exists(latestCache_db_path): + if reset: + os.remove(latestCache_db_path) + init_lastestcache_db() + else: + init_lastestcache_db() # Delete database and smartcontract directory if reset is set to 1 if args.reset == 1: @@ -3403,26 +2683,17 @@ if args.reset == 1: else: init_storage_if_not_exist() -# Backup and rebuild latestCache and system.db if rebuild flag is set -if args.rebuild == 1: - # Use the unified rebuild function with or without keywords - backup_and_rebuild_latestcache(keywords=args.keywords if args.keywords else None) - logger.info("Rebuild completed. Exiting...") - sys.exit(0) - # Determine API source for block and transaction information if __name__ == "__main__": # MAIN LOGIC STARTS # scan from the latest block saved locally to latest network block - scanBlockchain() logger.debug("Completed first scan") # At this point the script has updated to the latest block - # Now we connect to Blockbook's websocket API to get information about the latest blocks - # Neturl is the URL for Blockbook API whose websocket endpoint is being connected to + # Now we connect to flosight's websocket API to get information about the latest blocks + # Neturl is the URL for Flosight API whose websocket endpoint is being connected to asyncio.get_event_loop().run_until_complete(connect_to_websocket(websocket_uri)) -