Exception handle

- Adding exception handle for all fetch and db connections within a loop with retry timeout
- blocked all sys.exit to prevent system from shutting down. instead retry after 1 hr or 30 mins
This commit is contained in:
Sai Raj 2024-07-10 04:15:03 -04:00
parent 8464d48b0a
commit 5449ee83a5

View File

@ -5,7 +5,7 @@ import logging
import os
import shutil
import sys
import pyflo
#import pyflo
import requests
from sqlalchemy import create_engine, func, and_
from sqlalchemy.orm import sessionmaker
@ -25,6 +25,11 @@ import pdb
from util_rollback import rollback_to_block
RETRY_TIMEOUT_LONG = 30 * 60 # 30 mins
RETRY_TIMEOUT_SHORT = 60 # 1 min
DB_RETRY_TIMEOUT = 60 # 60 seconds
def newMultiRequest(apicall):
current_server = serverlist[0]
while True:
@ -75,7 +80,7 @@ def process_committee_flodata(flodata):
finally:
return flo_address_list
""" ?NOT USED?
def refresh_committee_list_old(admin_flo_id, api_url, blocktime):
response = requests.get(f'{api_url}api/v1/address/{admin_flo_id}', verify=API_VERIFY)
if response.status_code == 200:
@ -97,6 +102,7 @@ def refresh_committee_list_old(admin_flo_id, api_url, blocktime):
except:
continue
return committee_list
"""
def refresh_committee_list(admin_flo_id, api_url, blocktime):
@ -115,12 +121,18 @@ def refresh_committee_list(admin_flo_id, api_url, blocktime):
pass
def send_api_request(url):
response = requests.get(url, verify=API_VERIFY)
if response.status_code == 200:
return response.json()
else:
logger.info('Response from the Flosight API failed')
sys.exit(0)
while True:
try:
response = requests.get(url, verify=API_VERIFY)
if response.status_code == 200:
return response.json()
else:
logger.info(f'Response from the Flosight API failed. Retry in {RETRY_TIMEOUT_SHORT}s')
#sys.exit(0)
time.sleep(RETRY_TIMEOUT_SHORT)
except:
logger.info(f'Fetch from the Flosight API failed. Retry in {RETRY_TIMEOUT_LONG}s...')
time.sleep(RETRY_TIMEOUT_LONG)
url = f'{api_url}api/v1/address/{admin_flo_id}?details=txs'
response = send_api_request(url)
@ -251,30 +263,12 @@ def delete_contract_database(parameters):
def add_transaction_history(token_name, sourceFloAddress, destFloAddress, transferAmount, blockNumber, blockHash, blocktime, transactionHash, jsonData, transactionType, parsedFloData):
session = create_database_session_orm('token', {'token_name': token_name}, TokenBase)
blockchainReference = neturl + 'tx/' + transactionHash
session.add(TransactionHistory(
sourceFloAddress=sourceFloAddress,
destFloAddress=destFloAddress,
transferAmount=transferAmount,
blockNumber=blockNumber,
blockHash=blockHash,
time=blocktime,
transactionHash=transactionHash,
blockchainReference=blockchainReference,
jsonData=jsonData,
transactionType=transactionType,
parsedFloData=parsedFloData
))
session.commit()
session.close()
def add_contract_transaction_history(contract_name, contract_address, transactionType, transactionSubType, sourceFloAddress, destFloAddress, transferAmount, blockNumber, blockHash, blocktime, transactionHash, jsonData, parsedFloData):
session = create_database_session_orm('smart_contract', {'contract_name': f"{contract_name}", 'contract_address': f"{contract_address}"}, ContractBase)
blockchainReference = neturl + 'tx/' + transactionHash
session.add(ContractTransactionHistory(transactionType=transactionType,
sourceFloAddress=sourceFloAddress,
while True:
try:
session = create_database_session_orm('token', {'token_name': token_name}, TokenBase)
blockchainReference = neturl + 'tx/' + transactionHash
session.add(TransactionHistory(
sourceFloAddress=sourceFloAddress,
destFloAddress=destFloAddress,
transferAmount=transferAmount,
blockNumber=blockNumber,
@ -283,52 +277,93 @@ def add_contract_transaction_history(contract_name, contract_address, transactio
transactionHash=transactionHash,
blockchainReference=blockchainReference,
jsonData=jsonData,
transactionType=transactionType,
parsedFloData=parsedFloData
))
session.commit()
session.close()
))
session.commit()
session.close()
break
except:
logger.info(f"Unable to connect to 'token({token_name})' database... retrying in {DB_RETRY_TIMEOUT} seconds")
time.sleep(DB_RETRY_TIMEOUT)
def add_contract_transaction_history(contract_name, contract_address, transactionType, transactionSubType, sourceFloAddress, destFloAddress, transferAmount, blockNumber, blockHash, blocktime, transactionHash, jsonData, parsedFloData):
while True:
try:
session = create_database_session_orm('smart_contract', {'contract_name': f"{contract_name}", 'contract_address': f"{contract_address}"}, ContractBase)
blockchainReference = neturl + 'tx/' + transactionHash
session.add(ContractTransactionHistory(transactionType=transactionType,
sourceFloAddress=sourceFloAddress,
destFloAddress=destFloAddress,
transferAmount=transferAmount,
blockNumber=blockNumber,
blockHash=blockHash,
time=blocktime,
transactionHash=transactionHash,
blockchainReference=blockchainReference,
jsonData=jsonData,
parsedFloData=parsedFloData
))
session.commit()
session.close()
break
except:
logger.info(f"Unable to connect to 'smart_contract({contract_name})' database... retrying in {DB_RETRY_TIMEOUT} seconds")
time.sleep(DB_RETRY_TIMEOUT)
def rejected_transaction_history(transaction_data, parsed_data, sourceFloAddress, destFloAddress, rejectComment):
session = create_database_session_orm('system_dbs', {'db_name': "system"}, TokenBase)
blockchainReference = neturl + 'tx/' + transaction_data['txid']
session.add(RejectedTransactionHistory(tokenIdentification=parsed_data['tokenIdentification'],
sourceFloAddress=sourceFloAddress, destFloAddress=destFloAddress,
transferAmount=parsed_data['tokenAmount'],
blockNumber=transaction_data['blockheight'],
blockHash=transaction_data['blockhash'],
time=transaction_data['time'],
transactionHash=transaction_data['txid'],
blockchainReference=blockchainReference,
jsonData=json.dumps(transaction_data),
rejectComment=rejectComment,
transactionType=parsed_data['type'],
parsedFloData=json.dumps(parsed_data)
))
session.commit()
session.close()
while True:
try:
session = create_database_session_orm('system_dbs', {'db_name': "system"}, TokenBase)
blockchainReference = neturl + 'tx/' + transaction_data['txid']
session.add(RejectedTransactionHistory(tokenIdentification=parsed_data['tokenIdentification'],
sourceFloAddress=sourceFloAddress, destFloAddress=destFloAddress,
transferAmount=parsed_data['tokenAmount'],
blockNumber=transaction_data['blockheight'],
blockHash=transaction_data['blockhash'],
time=transaction_data['time'],
transactionHash=transaction_data['txid'],
blockchainReference=blockchainReference,
jsonData=json.dumps(transaction_data),
rejectComment=rejectComment,
transactionType=parsed_data['type'],
parsedFloData=json.dumps(parsed_data)
))
session.commit()
session.close()
break
except:
logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds")
time.sleep(DB_RETRY_TIMEOUT)
def rejected_contract_transaction_history(transaction_data, parsed_data, transactionType, contractAddress, sourceFloAddress, destFloAddress, rejectComment):
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
blockchainReference = neturl + 'tx/' + transaction_data['txid']
session.add(RejectedContractTransactionHistory(transactionType=transactionType,
contractName=parsed_data['contractName'],
contractAddress=contractAddress,
sourceFloAddress=sourceFloAddress,
destFloAddress=destFloAddress,
transferAmount=None,
blockNumber=transaction_data['blockheight'],
blockHash=transaction_data['blockhash'],
time=transaction_data['time'],
transactionHash=transaction_data['txid'],
blockchainReference=blockchainReference,
jsonData=json.dumps(transaction_data),
rejectComment=rejectComment,
parsedFloData=json.dumps(parsed_data)))
session.commit()
session.close()
while True:
try:
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
blockchainReference = neturl + 'tx/' + transaction_data['txid']
session.add(RejectedContractTransactionHistory(transactionType=transactionType,
contractName=parsed_data['contractName'],
contractAddress=contractAddress,
sourceFloAddress=sourceFloAddress,
destFloAddress=destFloAddress,
transferAmount=None,
blockNumber=transaction_data['blockheight'],
blockHash=transaction_data['blockhash'],
time=transaction_data['time'],
transactionHash=transaction_data['txid'],
blockchainReference=blockchainReference,
jsonData=json.dumps(transaction_data),
rejectComment=rejectComment,
parsedFloData=json.dumps(parsed_data)))
session.commit()
session.close()
break
except:
logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds")
time.sleep(DB_RETRY_TIMEOUT)
def convert_datetime_to_arrowobject(expiryTime):
expirytime_split = expiryTime.split(' ')
@ -349,19 +384,25 @@ def convert_datetime_to_arrowobject_regex(expiryTime):
def is_a_contract_address(floAddress):
# check contract address mapping db if the address is present, and return True or False based on that
system_db = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase)
while True:
try:
# check contract address mapping db if the address is present, and return True or False based on that
session = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase)
# contract_number = system_db.query(func.sum(ContractAddressMapping.contractAddress)).filter(ContractAddressMapping.contractAddress == floAddress).all()[0][0]
query_data = system_db.query(ContractAddressMapping.contractAddress).filter(ContractAddressMapping.contractAddress == floAddress).all()
contract_number = sum(Decimal(f"{amount[0]}") if amount[0] is not None else Decimal(0) for amount in query_data)
if contract_number is None or contract_number==0:
return False
else:
return True
# contract_number = session.query(func.sum(ContractAddressMapping.contractAddress)).filter(ContractAddressMapping.contractAddress == floAddress).all()[0][0]
query_data = session.query(ContractAddressMapping.contractAddress).filter(ContractAddressMapping.contractAddress == floAddress).all()
contract_number = sum(Decimal(f"{amount[0]}") if amount[0] is not None else Decimal(0) for amount in query_data)
session.close()
if contract_number is None or contract_number==0:
return False
else:
return True
except:
logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds")
time.sleep(DB_RETRY_TIMEOUT)
""" ? NOT USED ?
def fetchDynamicSwapPrice_old(contractStructure, transaction_data, blockinfo):
oracle_address = contractStructure['oracle_address']
# fetch transactions from the blockchain where from address : oracle-address... to address: contract address
@ -403,7 +444,7 @@ def fetchDynamicSwapPrice_old(contractStructure, transaction_data, blockinfo):
else:
logger.info('API error fetchDynamicSwapPrice')
sys.exit(0)
"""
def fetchDynamicSwapPrice(contractStructure, blockinfo):
oracle_address = contractStructure['oracle_address']
@ -414,39 +455,46 @@ def fetchDynamicSwapPrice(contractStructure, blockinfo):
latest_param = 'true'
mempool_param = 'false'
init_id = None
response = requests.get(f'{api_url}api/v1/address/{oracle_address}?details=txs', verify=API_VERIFY)
if response.status_code == 200:
response = response.json()
if len(response['txs']) == 0:
return float(contractStructure['price'])
else:
for transaction in response['txs']:
if 'floData' in transaction.keys():
floData = transaction['floData']
while True:
try:
response = requests.get(f'{api_url}api/v1/address/{oracle_address}?details=txs', verify=API_VERIFY)
if response.status_code == 200:
response = response.json()
if len(response['txs']) == 0:
return float(contractStructure['price'])
else:
floData = ''
# If the blocktime of the transaction is < than the current block time
if transaction['time'] < blockinfo['time']:
# Check if flodata is in the format we are looking for
# ie. {"price-update":{"contract-name": "", "contract-address": "", "price": 3}}
# and receiver address should be contractAddress
try:
sender_address, receiver_address = find_sender_receiver(transaction)
assert sender_address == oracle_address
assert receiver_address == contractStructure['contractAddress']
floData = json.loads(floData)
# Check if the contract name and address are right
assert floData['price-update']['contract-name'] == contractStructure['contractName']
assert floData['price-update']['contract-address'] == contractStructure['contractAddress']
return float(floData['price-update']['price'])
except:
continue
else:
continue
else:
logger.info('API error fetchDynamicSwapPrice')
sys.exit(0)
for transaction in response['txs']:
if 'floData' in transaction.keys():
floData = transaction['floData']
else:
floData = ''
# If the blocktime of the transaction is < than the current block time
if transaction['time'] < blockinfo['time']:
# Check if flodata is in the format we are looking for
# ie. {"price-update":{"contract-name": "", "contract-address": "", "price": 3}}
# and receiver address should be contractAddress
try:
sender_address, receiver_address = find_sender_receiver(transaction)
assert sender_address == oracle_address
assert receiver_address == contractStructure['contractAddress']
floData = json.loads(floData)
# Check if the contract name and address are right
assert floData['price-update']['contract-name'] == contractStructure['contractName']
assert floData['price-update']['contract-address'] == contractStructure['contractAddress']
return float(floData['price-update']['price'])
except:
continue
else:
continue
break
else:
logger.info(f'API error fetchDynamicSwapPrice. Retry in {RETRY_TIMEOUT_LONG}s...')
#sys.exit(0)
time.sleep(RETRY_TIMEOUT_LONG)
except:
logger.info(f'API error fetchDynamicSwapPrice. Retry in {RETRY_TIMEOUT_LONG}s...')
time.sleep(RETRY_TIMEOUT_LONG)
return float(contractStructure['price'])
@ -462,7 +510,9 @@ def processBlock(blockindex=None, blockhash=None):
logger.info(f"API call block-index/{blockindex} failed to give proper response. Retrying.")
blockinfo = newMultiRequest(f"block/{blockhash}")
#TODO: Check for reorg in here
# Check and perform operations which do not require blockchain intervention
checkLocal_expiry_trigger_deposit(blockinfo)
@ -502,17 +552,27 @@ def processBlock(blockindex=None, blockhash=None):
tempinfo.remove(tx)
blockinfo['txs'] = tempinfo
updateLatestBlock(blockinfo)
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
entry = session.query(SystemData).filter(SystemData.attribute == 'lastblockscanned').all()[0]
entry.value = str(blockinfo['height'])
session.commit()
session.close()
while True:
try:
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
entry = session.query(SystemData).filter(SystemData.attribute == 'lastblockscanned').all()[0]
entry.value = str(blockinfo['height'])
session.commit()
session.close()
except:
logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds")
time.sleep(DB_RETRY_TIMEOUT)
def updateLatestTransaction(transactionData, parsed_data, db_reference, transactionType=None ):
# connect to latest transaction db
conn = create_database_connection('latest_cache', {'db_name':"latestCache"})
while True:
try:
conn = create_database_connection('latest_cache', {'db_name':"latestCache"})
break
except:
time.sleep(DB_RETRY_TIMEOUT)
if transactionType is None:
transactionType = parsed_data['type']
conn.execute("INSERT INTO latestTransactions(transactionHash, blockNumber, jsonData, transactionType, parsedFloData, db_reference) VALUES (?,?,?,?,?,?)", (transactionData['txid'], transactionData['blockheight'], json.dumps(transactionData), transactionType, json.dumps(parsed_data), db_reference))
@ -522,7 +582,12 @@ def updateLatestTransaction(transactionData, parsed_data, db_reference, transact
def updateLatestBlock(blockData):
# connect to latest block db
conn = create_database_connection('latest_cache', {'db_name':"latestCache"})
while True:
try:
conn = create_database_connection('latest_cache', {'db_name':"latestCache"})
break
except:
time.sleep(DB_RETRY_TIMEOUT)
conn.execute('INSERT INTO latestBlocks(blockNumber, blockHash, jsonData) VALUES (?,?,?)', (blockData['height'], blockData['hash'], json.dumps(blockData)))
#conn.commit()
conn.close()
@ -551,7 +616,13 @@ def transferToken(tokenIdentification, tokenAmount, inputAddress, outputAddress,
except:
logger.info("This is a critical error. Please report to developers")
session = create_database_session_orm('token', {'token_name': f"{tokenIdentification}"}, TokenBase)
while True:
try:
session = create_database_session_orm('token', {'token_name': f"{tokenIdentification}"}, TokenBase)
break
except:
time.sleep(DB_RETRY_TIMEOUT)
tokenAmount = float(tokenAmount)
if isInfiniteToken == True:
# Make new entry
@ -777,13 +848,23 @@ def check_contract_status(contractName, contractAddress):
# Status of the contract is at 2 tables in system.db
# activecontracts and time_actions
# select the last entry form the colum
connection = create_database_connection('system_dbs')
while True:
try:
connection = create_database_connection('system_dbs')
break
except:
time.sleep(DB_RETRY_TIMEOUT)
contract_status = connection.execute(f'SELECT status FROM time_actions WHERE id=(SELECT MAX(id) FROM time_actions WHERE contractName="{contractName}" AND contractAddress="{contractAddress}")').fetchall()
return contract_status[0][0]
def close_expire_contract(contractStructure, contractStatus, transactionHash, blockNumber, blockHash, incorporationDate, expiryDate, closeDate, trigger_time, trigger_activity, contractName, contractAddress, contractType, tokens_db, parsed_data, blockHeight):
connection = create_database_connection('system_dbs', {'db_name':'system'})
while True:
try:
connection = create_database_connection('system_dbs', {'db_name':'system'})
break
except:
time.sleep(DB_RETRY_TIMEOUT)
connection.execute('INSERT INTO activecontracts VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (None, contractStructure['contractName'], contractStructure['contractAddress'], contractStatus, contractStructure['tokenIdentification'], contractStructure['contractType'], transactionHash, blockNumber, blockHash, incorporationDate, expiryDate, closeDate))
connection.execute('INSERT INTO time_actions VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (None, trigger_time, trigger_activity, contractStatus, contractName, contractAddress, contractType, tokens_db, parsed_data, transactionHash, blockHeight))
connection.close()
@ -804,7 +885,13 @@ def return_active_deposits(session):
def checkLocal_expiry_trigger_deposit(blockinfo):
# Connect to system.db with a session
systemdb_session = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase)
while True:
try:
systemdb_session = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase)
break
except:
time.sleep(DB_RETRY_TIMEOUT)
timeactions_tx_hashes = []
active_contracts = return_active_contracts(systemdb_session)
active_deposits = return_active_deposits(systemdb_session)
@ -821,15 +908,23 @@ def checkLocal_expiry_trigger_deposit(blockinfo):
tx_type = 'trigger'
data = [blockinfo['hash'], blockinfo['height'], blockinfo['time'], blockinfo['size'], tx_type]
response = requests.get(f'https://stdops.ranchimall.net/hash?data={data}', verify=API_VERIFY)
if response.status_code == 200:
txid = response.json()
elif response.status_code == 404:
logger.info('Internal trigger has failed')
sys.exit(0)
def _get_txid():
while True:
try:
response = requests.get(f'https://stdops.ranchimall.net/hash?data={data}', verify=API_VERIFY)
if response.status_code == 200:
txid = response.json()
return txid
elif response.status_code == 404:
logger.info(f'Internal trigger has failed (404) for getting txid from stdops.ranchimall.net. Retry in {RETRY_TIMEOUT_LONG}s')
time.sleep(RETRY_TIMEOUT_LONG)
except:
logger.info(f'Internal trigger has failed for getting txid from stdops.ranchimall.net. Retry in {RETRY_TIMEOUT_LONG}s')
time.sleep(RETRY_TIMEOUT_LONG)
transaction_data = {}
transaction_data['txid'] = txid
transaction_data['txid'] = _get_txid()
transaction_data['blockheight'] = blockinfo['height']
transaction_data['time'] = blockinfo['time']
@ -979,7 +1074,7 @@ def check_reorg():
continue
else:
logger.info('Response from the Blockbook API failed')
sys.exit(0)
sys.exit(0) #TODO test reorg fix and remove this
connection.close()
@ -990,7 +1085,12 @@ def check_reorg():
return block_number
def extract_contractStructure(contractName, contractAddress):
connection = create_database_connection('smart_contract', {'contract_name':f"{contractName}", 'contract_address':f"{contractAddress}"})
while True:
try:
connection = create_database_connection('smart_contract', {'contract_name':f"{contractName}", 'contract_address':f"{contractAddress}"})
break
except:
time.sleep(DB_RETRY_TIMEOUT)
attributevaluepair = connection.execute("SELECT attribute, value FROM contractstructure WHERE attribute != 'flodata'").fetchall()
contractStructure = {}
conditionDict = {}
@ -2346,10 +2446,16 @@ def processTransaction(transaction_data, parsed_data, blockinfo):
def scanBlockchain():
# Read start block no
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
startblock = int(session.query(SystemData).filter_by(attribute='lastblockscanned').all()[0].value) + 1
session.commit()
session.close()
while True:
try:
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
startblock = int(session.query(SystemData).filter_by(attribute='lastblockscanned').all()[0].value) + 1
session.commit()
session.close()
break
except:
logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds")
time.sleep(DB_RETRY_TIMEOUT)
# todo Rule 6 - Find current block height
# Rule 7 - Start analysing the block contents from starting block to current height