From 5449ee83a51a7e96be2ce9d4a8ec3e4f3b051a5a Mon Sep 17 00:00:00 2001 From: Sai Raj <39055732+sairajzero@users.noreply.github.com> Date: Wed, 10 Jul 2024 04:15:03 -0400 Subject: [PATCH] Exception handle - Adding exception handle for all fetch and db connections within a loop with retry timeout - blocked all sys.exit to prevent system from shutting down. instead retry after 1 hr or 30 mins --- tracktokens_smartcontracts.py | 386 ++++++++++++++++++++++------------ 1 file changed, 246 insertions(+), 140 deletions(-) diff --git a/tracktokens_smartcontracts.py b/tracktokens_smartcontracts.py index b7cee97..a2164d7 100755 --- a/tracktokens_smartcontracts.py +++ b/tracktokens_smartcontracts.py @@ -5,7 +5,7 @@ import logging import os import shutil import sys -import pyflo +#import pyflo import requests from sqlalchemy import create_engine, func, and_ from sqlalchemy.orm import sessionmaker @@ -25,6 +25,11 @@ import pdb from util_rollback import rollback_to_block +RETRY_TIMEOUT_LONG = 30 * 60 # 30 mins +RETRY_TIMEOUT_SHORT = 60 # 1 min +DB_RETRY_TIMEOUT = 60 # 60 seconds + + def newMultiRequest(apicall): current_server = serverlist[0] while True: @@ -75,7 +80,7 @@ def process_committee_flodata(flodata): finally: return flo_address_list - +""" ?NOT USED? def refresh_committee_list_old(admin_flo_id, api_url, blocktime): response = requests.get(f'{api_url}api/v1/address/{admin_flo_id}', verify=API_VERIFY) if response.status_code == 200: @@ -97,6 +102,7 @@ def refresh_committee_list_old(admin_flo_id, api_url, blocktime): except: continue return committee_list +""" def refresh_committee_list(admin_flo_id, api_url, blocktime): @@ -115,12 +121,18 @@ def refresh_committee_list(admin_flo_id, api_url, blocktime): pass def send_api_request(url): - response = requests.get(url, verify=API_VERIFY) - if response.status_code == 200: - return response.json() - else: - logger.info('Response from the Flosight API failed') - sys.exit(0) + while True: + try: + response = requests.get(url, verify=API_VERIFY) + if response.status_code == 200: + return response.json() + else: + logger.info(f'Response from the Flosight API failed. Retry in {RETRY_TIMEOUT_SHORT}s') + #sys.exit(0) + time.sleep(RETRY_TIMEOUT_SHORT) + except: + logger.info(f'Fetch from the Flosight API failed. Retry in {RETRY_TIMEOUT_LONG}s...') + time.sleep(RETRY_TIMEOUT_LONG) url = f'{api_url}api/v1/address/{admin_flo_id}?details=txs' response = send_api_request(url) @@ -251,30 +263,12 @@ def delete_contract_database(parameters): def add_transaction_history(token_name, sourceFloAddress, destFloAddress, transferAmount, blockNumber, blockHash, blocktime, transactionHash, jsonData, transactionType, parsedFloData): - session = create_database_session_orm('token', {'token_name': token_name}, TokenBase) - blockchainReference = neturl + 'tx/' + transactionHash - session.add(TransactionHistory( - sourceFloAddress=sourceFloAddress, - destFloAddress=destFloAddress, - transferAmount=transferAmount, - blockNumber=blockNumber, - blockHash=blockHash, - time=blocktime, - transactionHash=transactionHash, - blockchainReference=blockchainReference, - jsonData=jsonData, - transactionType=transactionType, - parsedFloData=parsedFloData - )) - session.commit() - session.close() - - -def add_contract_transaction_history(contract_name, contract_address, transactionType, transactionSubType, sourceFloAddress, destFloAddress, transferAmount, blockNumber, blockHash, blocktime, transactionHash, jsonData, parsedFloData): - session = create_database_session_orm('smart_contract', {'contract_name': f"{contract_name}", 'contract_address': f"{contract_address}"}, ContractBase) - blockchainReference = neturl + 'tx/' + transactionHash - session.add(ContractTransactionHistory(transactionType=transactionType, - sourceFloAddress=sourceFloAddress, + while True: + try: + session = create_database_session_orm('token', {'token_name': token_name}, TokenBase) + blockchainReference = neturl + 'tx/' + transactionHash + session.add(TransactionHistory( + sourceFloAddress=sourceFloAddress, destFloAddress=destFloAddress, transferAmount=transferAmount, blockNumber=blockNumber, @@ -283,52 +277,93 @@ def add_contract_transaction_history(contract_name, contract_address, transactio transactionHash=transactionHash, blockchainReference=blockchainReference, jsonData=jsonData, + transactionType=transactionType, parsedFloData=parsedFloData - )) - session.commit() - session.close() + )) + session.commit() + session.close() + break + except: + logger.info(f"Unable to connect to 'token({token_name})' database... retrying in {DB_RETRY_TIMEOUT} seconds") + time.sleep(DB_RETRY_TIMEOUT) + + +def add_contract_transaction_history(contract_name, contract_address, transactionType, transactionSubType, sourceFloAddress, destFloAddress, transferAmount, blockNumber, blockHash, blocktime, transactionHash, jsonData, parsedFloData): + while True: + try: + session = create_database_session_orm('smart_contract', {'contract_name': f"{contract_name}", 'contract_address': f"{contract_address}"}, ContractBase) + blockchainReference = neturl + 'tx/' + transactionHash + session.add(ContractTransactionHistory(transactionType=transactionType, + sourceFloAddress=sourceFloAddress, + destFloAddress=destFloAddress, + transferAmount=transferAmount, + blockNumber=blockNumber, + blockHash=blockHash, + time=blocktime, + transactionHash=transactionHash, + blockchainReference=blockchainReference, + jsonData=jsonData, + parsedFloData=parsedFloData + )) + session.commit() + session.close() + break + except: + logger.info(f"Unable to connect to 'smart_contract({contract_name})' database... retrying in {DB_RETRY_TIMEOUT} seconds") + time.sleep(DB_RETRY_TIMEOUT) def rejected_transaction_history(transaction_data, parsed_data, sourceFloAddress, destFloAddress, rejectComment): - session = create_database_session_orm('system_dbs', {'db_name': "system"}, TokenBase) - blockchainReference = neturl + 'tx/' + transaction_data['txid'] - session.add(RejectedTransactionHistory(tokenIdentification=parsed_data['tokenIdentification'], - sourceFloAddress=sourceFloAddress, destFloAddress=destFloAddress, - transferAmount=parsed_data['tokenAmount'], - blockNumber=transaction_data['blockheight'], - blockHash=transaction_data['blockhash'], - time=transaction_data['time'], - transactionHash=transaction_data['txid'], - blockchainReference=blockchainReference, - jsonData=json.dumps(transaction_data), - rejectComment=rejectComment, - transactionType=parsed_data['type'], - parsedFloData=json.dumps(parsed_data) - )) - session.commit() - session.close() + while True: + try: + session = create_database_session_orm('system_dbs', {'db_name': "system"}, TokenBase) + blockchainReference = neturl + 'tx/' + transaction_data['txid'] + session.add(RejectedTransactionHistory(tokenIdentification=parsed_data['tokenIdentification'], + sourceFloAddress=sourceFloAddress, destFloAddress=destFloAddress, + transferAmount=parsed_data['tokenAmount'], + blockNumber=transaction_data['blockheight'], + blockHash=transaction_data['blockhash'], + time=transaction_data['time'], + transactionHash=transaction_data['txid'], + blockchainReference=blockchainReference, + jsonData=json.dumps(transaction_data), + rejectComment=rejectComment, + transactionType=parsed_data['type'], + parsedFloData=json.dumps(parsed_data) + )) + session.commit() + session.close() + break + except: + logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds") + time.sleep(DB_RETRY_TIMEOUT) def rejected_contract_transaction_history(transaction_data, parsed_data, transactionType, contractAddress, sourceFloAddress, destFloAddress, rejectComment): - session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) - blockchainReference = neturl + 'tx/' + transaction_data['txid'] - session.add(RejectedContractTransactionHistory(transactionType=transactionType, - contractName=parsed_data['contractName'], - contractAddress=contractAddress, - sourceFloAddress=sourceFloAddress, - destFloAddress=destFloAddress, - transferAmount=None, - blockNumber=transaction_data['blockheight'], - blockHash=transaction_data['blockhash'], - time=transaction_data['time'], - transactionHash=transaction_data['txid'], - blockchainReference=blockchainReference, - jsonData=json.dumps(transaction_data), - rejectComment=rejectComment, - parsedFloData=json.dumps(parsed_data))) - session.commit() - session.close() - + while True: + try: + session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) + blockchainReference = neturl + 'tx/' + transaction_data['txid'] + session.add(RejectedContractTransactionHistory(transactionType=transactionType, + contractName=parsed_data['contractName'], + contractAddress=contractAddress, + sourceFloAddress=sourceFloAddress, + destFloAddress=destFloAddress, + transferAmount=None, + blockNumber=transaction_data['blockheight'], + blockHash=transaction_data['blockhash'], + time=transaction_data['time'], + transactionHash=transaction_data['txid'], + blockchainReference=blockchainReference, + jsonData=json.dumps(transaction_data), + rejectComment=rejectComment, + parsedFloData=json.dumps(parsed_data))) + session.commit() + session.close() + break + except: + logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds") + time.sleep(DB_RETRY_TIMEOUT) def convert_datetime_to_arrowobject(expiryTime): expirytime_split = expiryTime.split(' ') @@ -349,19 +384,25 @@ def convert_datetime_to_arrowobject_regex(expiryTime): def is_a_contract_address(floAddress): - # check contract address mapping db if the address is present, and return True or False based on that - system_db = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase) + while True: + try: + # check contract address mapping db if the address is present, and return True or False based on that + session = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase) - # contract_number = system_db.query(func.sum(ContractAddressMapping.contractAddress)).filter(ContractAddressMapping.contractAddress == floAddress).all()[0][0] - query_data = system_db.query(ContractAddressMapping.contractAddress).filter(ContractAddressMapping.contractAddress == floAddress).all() - contract_number = sum(Decimal(f"{amount[0]}") if amount[0] is not None else Decimal(0) for amount in query_data) - - if contract_number is None or contract_number==0: - return False - else: - return True + # contract_number = session.query(func.sum(ContractAddressMapping.contractAddress)).filter(ContractAddressMapping.contractAddress == floAddress).all()[0][0] + query_data = session.query(ContractAddressMapping.contractAddress).filter(ContractAddressMapping.contractAddress == floAddress).all() + contract_number = sum(Decimal(f"{amount[0]}") if amount[0] is not None else Decimal(0) for amount in query_data) + session.close() + if contract_number is None or contract_number==0: + return False + else: + return True + except: + logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds") + time.sleep(DB_RETRY_TIMEOUT) +""" ? NOT USED ? def fetchDynamicSwapPrice_old(contractStructure, transaction_data, blockinfo): oracle_address = contractStructure['oracle_address'] # fetch transactions from the blockchain where from address : oracle-address... to address: contract address @@ -403,7 +444,7 @@ def fetchDynamicSwapPrice_old(contractStructure, transaction_data, blockinfo): else: logger.info('API error fetchDynamicSwapPrice') sys.exit(0) - +""" def fetchDynamicSwapPrice(contractStructure, blockinfo): oracle_address = contractStructure['oracle_address'] @@ -414,39 +455,46 @@ def fetchDynamicSwapPrice(contractStructure, blockinfo): latest_param = 'true' mempool_param = 'false' init_id = None - response = requests.get(f'{api_url}api/v1/address/{oracle_address}?details=txs', verify=API_VERIFY) - if response.status_code == 200: - response = response.json() - if len(response['txs']) == 0: - return float(contractStructure['price']) - else: - for transaction in response['txs']: - if 'floData' in transaction.keys(): - floData = transaction['floData'] + while True: + try: + response = requests.get(f'{api_url}api/v1/address/{oracle_address}?details=txs', verify=API_VERIFY) + if response.status_code == 200: + response = response.json() + if len(response['txs']) == 0: + return float(contractStructure['price']) else: - floData = '' - # If the blocktime of the transaction is < than the current block time - if transaction['time'] < blockinfo['time']: - # Check if flodata is in the format we are looking for - # ie. {"price-update":{"contract-name": "", "contract-address": "", "price": 3}} - # and receiver address should be contractAddress - try: - sender_address, receiver_address = find_sender_receiver(transaction) - assert sender_address == oracle_address - assert receiver_address == contractStructure['contractAddress'] - floData = json.loads(floData) - # Check if the contract name and address are right - assert floData['price-update']['contract-name'] == contractStructure['contractName'] - assert floData['price-update']['contract-address'] == contractStructure['contractAddress'] - return float(floData['price-update']['price']) - except: - continue - else: - continue - else: - logger.info('API error fetchDynamicSwapPrice') - sys.exit(0) - + for transaction in response['txs']: + if 'floData' in transaction.keys(): + floData = transaction['floData'] + else: + floData = '' + # If the blocktime of the transaction is < than the current block time + if transaction['time'] < blockinfo['time']: + # Check if flodata is in the format we are looking for + # ie. {"price-update":{"contract-name": "", "contract-address": "", "price": 3}} + # and receiver address should be contractAddress + try: + sender_address, receiver_address = find_sender_receiver(transaction) + assert sender_address == oracle_address + assert receiver_address == contractStructure['contractAddress'] + floData = json.loads(floData) + # Check if the contract name and address are right + assert floData['price-update']['contract-name'] == contractStructure['contractName'] + assert floData['price-update']['contract-address'] == contractStructure['contractAddress'] + return float(floData['price-update']['price']) + except: + continue + else: + continue + break + else: + logger.info(f'API error fetchDynamicSwapPrice. Retry in {RETRY_TIMEOUT_LONG}s...') + #sys.exit(0) + time.sleep(RETRY_TIMEOUT_LONG) + except: + logger.info(f'API error fetchDynamicSwapPrice. Retry in {RETRY_TIMEOUT_LONG}s...') + time.sleep(RETRY_TIMEOUT_LONG) + return float(contractStructure['price']) @@ -462,7 +510,9 @@ def processBlock(blockindex=None, blockhash=None): logger.info(f"API call block-index/{blockindex} failed to give proper response. Retrying.") blockinfo = newMultiRequest(f"block/{blockhash}") - + + #TODO: Check for reorg in here + # Check and perform operations which do not require blockchain intervention checkLocal_expiry_trigger_deposit(blockinfo) @@ -502,17 +552,27 @@ def processBlock(blockindex=None, blockhash=None): tempinfo.remove(tx) blockinfo['txs'] = tempinfo updateLatestBlock(blockinfo) - - session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) - entry = session.query(SystemData).filter(SystemData.attribute == 'lastblockscanned').all()[0] - entry.value = str(blockinfo['height']) - session.commit() - session.close() + + while True: + try: + session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) + entry = session.query(SystemData).filter(SystemData.attribute == 'lastblockscanned').all()[0] + entry.value = str(blockinfo['height']) + session.commit() + session.close() + except: + logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds") + time.sleep(DB_RETRY_TIMEOUT) def updateLatestTransaction(transactionData, parsed_data, db_reference, transactionType=None ): # connect to latest transaction db - conn = create_database_connection('latest_cache', {'db_name':"latestCache"}) + while True: + try: + conn = create_database_connection('latest_cache', {'db_name':"latestCache"}) + break + except: + time.sleep(DB_RETRY_TIMEOUT) if transactionType is None: transactionType = parsed_data['type'] conn.execute("INSERT INTO latestTransactions(transactionHash, blockNumber, jsonData, transactionType, parsedFloData, db_reference) VALUES (?,?,?,?,?,?)", (transactionData['txid'], transactionData['blockheight'], json.dumps(transactionData), transactionType, json.dumps(parsed_data), db_reference)) @@ -522,7 +582,12 @@ def updateLatestTransaction(transactionData, parsed_data, db_reference, transact def updateLatestBlock(blockData): # connect to latest block db - conn = create_database_connection('latest_cache', {'db_name':"latestCache"}) + while True: + try: + conn = create_database_connection('latest_cache', {'db_name':"latestCache"}) + break + except: + time.sleep(DB_RETRY_TIMEOUT) conn.execute('INSERT INTO latestBlocks(blockNumber, blockHash, jsonData) VALUES (?,?,?)', (blockData['height'], blockData['hash'], json.dumps(blockData))) #conn.commit() conn.close() @@ -551,7 +616,13 @@ def transferToken(tokenIdentification, tokenAmount, inputAddress, outputAddress, except: logger.info("This is a critical error. Please report to developers") - session = create_database_session_orm('token', {'token_name': f"{tokenIdentification}"}, TokenBase) + while True: + try: + session = create_database_session_orm('token', {'token_name': f"{tokenIdentification}"}, TokenBase) + break + except: + time.sleep(DB_RETRY_TIMEOUT) + tokenAmount = float(tokenAmount) if isInfiniteToken == True: # Make new entry @@ -777,13 +848,23 @@ def check_contract_status(contractName, contractAddress): # Status of the contract is at 2 tables in system.db # activecontracts and time_actions # select the last entry form the colum - connection = create_database_connection('system_dbs') + while True: + try: + connection = create_database_connection('system_dbs') + break + except: + time.sleep(DB_RETRY_TIMEOUT) contract_status = connection.execute(f'SELECT status FROM time_actions WHERE id=(SELECT MAX(id) FROM time_actions WHERE contractName="{contractName}" AND contractAddress="{contractAddress}")').fetchall() return contract_status[0][0] def close_expire_contract(contractStructure, contractStatus, transactionHash, blockNumber, blockHash, incorporationDate, expiryDate, closeDate, trigger_time, trigger_activity, contractName, contractAddress, contractType, tokens_db, parsed_data, blockHeight): - connection = create_database_connection('system_dbs', {'db_name':'system'}) + while True: + try: + connection = create_database_connection('system_dbs', {'db_name':'system'}) + break + except: + time.sleep(DB_RETRY_TIMEOUT) connection.execute('INSERT INTO activecontracts VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (None, contractStructure['contractName'], contractStructure['contractAddress'], contractStatus, contractStructure['tokenIdentification'], contractStructure['contractType'], transactionHash, blockNumber, blockHash, incorporationDate, expiryDate, closeDate)) connection.execute('INSERT INTO time_actions VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (None, trigger_time, trigger_activity, contractStatus, contractName, contractAddress, contractType, tokens_db, parsed_data, transactionHash, blockHeight)) connection.close() @@ -804,7 +885,13 @@ def return_active_deposits(session): def checkLocal_expiry_trigger_deposit(blockinfo): # Connect to system.db with a session - systemdb_session = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase) + while True: + try: + systemdb_session = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase) + + break + except: + time.sleep(DB_RETRY_TIMEOUT) timeactions_tx_hashes = [] active_contracts = return_active_contracts(systemdb_session) active_deposits = return_active_deposits(systemdb_session) @@ -821,15 +908,23 @@ def checkLocal_expiry_trigger_deposit(blockinfo): tx_type = 'trigger' data = [blockinfo['hash'], blockinfo['height'], blockinfo['time'], blockinfo['size'], tx_type] - response = requests.get(f'https://stdops.ranchimall.net/hash?data={data}', verify=API_VERIFY) - if response.status_code == 200: - txid = response.json() - elif response.status_code == 404: - logger.info('Internal trigger has failed') - sys.exit(0) + def _get_txid(): + while True: + try: + response = requests.get(f'https://stdops.ranchimall.net/hash?data={data}', verify=API_VERIFY) + if response.status_code == 200: + txid = response.json() + return txid + elif response.status_code == 404: + logger.info(f'Internal trigger has failed (404) for getting txid from stdops.ranchimall.net. Retry in {RETRY_TIMEOUT_LONG}s') + time.sleep(RETRY_TIMEOUT_LONG) + except: + logger.info(f'Internal trigger has failed for getting txid from stdops.ranchimall.net. Retry in {RETRY_TIMEOUT_LONG}s') + time.sleep(RETRY_TIMEOUT_LONG) + transaction_data = {} - transaction_data['txid'] = txid + transaction_data['txid'] = _get_txid() transaction_data['blockheight'] = blockinfo['height'] transaction_data['time'] = blockinfo['time'] @@ -979,7 +1074,7 @@ def check_reorg(): continue else: logger.info('Response from the Blockbook API failed') - sys.exit(0) + sys.exit(0) #TODO test reorg fix and remove this connection.close() @@ -990,7 +1085,12 @@ def check_reorg(): return block_number def extract_contractStructure(contractName, contractAddress): - connection = create_database_connection('smart_contract', {'contract_name':f"{contractName}", 'contract_address':f"{contractAddress}"}) + while True: + try: + connection = create_database_connection('smart_contract', {'contract_name':f"{contractName}", 'contract_address':f"{contractAddress}"}) + break + except: + time.sleep(DB_RETRY_TIMEOUT) attributevaluepair = connection.execute("SELECT attribute, value FROM contractstructure WHERE attribute != 'flodata'").fetchall() contractStructure = {} conditionDict = {} @@ -2346,10 +2446,16 @@ def processTransaction(transaction_data, parsed_data, blockinfo): def scanBlockchain(): # Read start block no - session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) - startblock = int(session.query(SystemData).filter_by(attribute='lastblockscanned').all()[0].value) + 1 - session.commit() - session.close() + while True: + try: + session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) + startblock = int(session.query(SystemData).filter_by(attribute='lastblockscanned').all()[0].value) + 1 + session.commit() + session.close() + break + except: + logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds") + time.sleep(DB_RETRY_TIMEOUT) # todo Rule 6 - Find current block height # Rule 7 - Start analysing the block contents from starting block to current height