Syncing with server version tracktokens_smartcontracts.py
This commit is contained in:
parent
5b031dae34
commit
6f50c3d03d
@ -5,7 +5,7 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
import sys
|
import sys
|
||||||
import pyflo
|
#import pyflo
|
||||||
import requests
|
import requests
|
||||||
from sqlalchemy import create_engine, func, and_
|
from sqlalchemy import create_engine, func, and_
|
||||||
from sqlalchemy.orm import sessionmaker
|
from sqlalchemy.orm import sessionmaker
|
||||||
@ -22,6 +22,19 @@ import asyncio
|
|||||||
import websockets
|
import websockets
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
import pdb
|
import pdb
|
||||||
|
from util_rollback import rollback_to_block
|
||||||
|
|
||||||
|
import requests
|
||||||
|
from requests.packages.urllib3.exceptions import InsecureRequestWarning
|
||||||
|
|
||||||
|
# Disable the InsecureRequestWarning
|
||||||
|
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
RETRY_TIMEOUT_LONG = 30 * 60 # 30 mins
|
||||||
|
RETRY_TIMEOUT_SHORT = 60 # 1 min
|
||||||
|
DB_RETRY_TIMEOUT = 60 # 60 seconds
|
||||||
|
|
||||||
|
|
||||||
def newMultiRequest(apicall):
|
def newMultiRequest(apicall):
|
||||||
@ -74,7 +87,7 @@ def process_committee_flodata(flodata):
|
|||||||
finally:
|
finally:
|
||||||
return flo_address_list
|
return flo_address_list
|
||||||
|
|
||||||
|
""" ?NOT USED?
|
||||||
def refresh_committee_list_old(admin_flo_id, api_url, blocktime):
|
def refresh_committee_list_old(admin_flo_id, api_url, blocktime):
|
||||||
response = requests.get(f'{api_url}api/v1/address/{admin_flo_id}', verify=API_VERIFY)
|
response = requests.get(f'{api_url}api/v1/address/{admin_flo_id}', verify=API_VERIFY)
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
@ -96,6 +109,7 @@ def refresh_committee_list_old(admin_flo_id, api_url, blocktime):
|
|||||||
except:
|
except:
|
||||||
continue
|
continue
|
||||||
return committee_list
|
return committee_list
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
def refresh_committee_list(admin_flo_id, api_url, blocktime):
|
def refresh_committee_list(admin_flo_id, api_url, blocktime):
|
||||||
@ -114,12 +128,18 @@ def refresh_committee_list(admin_flo_id, api_url, blocktime):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def send_api_request(url):
|
def send_api_request(url):
|
||||||
response = requests.get(url, verify=API_VERIFY)
|
while True:
|
||||||
if response.status_code == 200:
|
try:
|
||||||
return response.json()
|
response = requests.get(url, verify=API_VERIFY)
|
||||||
else:
|
if response.status_code == 200:
|
||||||
logger.info('Response from the Flosight API failed')
|
return response.json()
|
||||||
sys.exit(0)
|
else:
|
||||||
|
logger.info(f'Response from the Flosight API failed. Retry in {RETRY_TIMEOUT_SHORT}s')
|
||||||
|
#sys.exit(0)
|
||||||
|
time.sleep(RETRY_TIMEOUT_SHORT)
|
||||||
|
except:
|
||||||
|
logger.info(f'Fetch from the Flosight API failed. Retry in {RETRY_TIMEOUT_LONG}s...')
|
||||||
|
time.sleep(RETRY_TIMEOUT_LONG)
|
||||||
|
|
||||||
url = f'{api_url}api/v1/address/{admin_flo_id}?details=txs'
|
url = f'{api_url}api/v1/address/{admin_flo_id}?details=txs'
|
||||||
response = send_api_request(url)
|
response = send_api_request(url)
|
||||||
@ -250,30 +270,12 @@ def delete_contract_database(parameters):
|
|||||||
|
|
||||||
|
|
||||||
def add_transaction_history(token_name, sourceFloAddress, destFloAddress, transferAmount, blockNumber, blockHash, blocktime, transactionHash, jsonData, transactionType, parsedFloData):
|
def add_transaction_history(token_name, sourceFloAddress, destFloAddress, transferAmount, blockNumber, blockHash, blocktime, transactionHash, jsonData, transactionType, parsedFloData):
|
||||||
session = create_database_session_orm('token', {'token_name': token_name}, TokenBase)
|
while True:
|
||||||
blockchainReference = neturl + 'tx/' + transactionHash
|
try:
|
||||||
session.add(TransactionHistory(
|
session = create_database_session_orm('token', {'token_name': token_name}, TokenBase)
|
||||||
sourceFloAddress=sourceFloAddress,
|
blockchainReference = neturl + 'tx/' + transactionHash
|
||||||
destFloAddress=destFloAddress,
|
session.add(TransactionHistory(
|
||||||
transferAmount=transferAmount,
|
sourceFloAddress=sourceFloAddress,
|
||||||
blockNumber=blockNumber,
|
|
||||||
blockHash=blockHash,
|
|
||||||
time=blocktime,
|
|
||||||
transactionHash=transactionHash,
|
|
||||||
blockchainReference=blockchainReference,
|
|
||||||
jsonData=jsonData,
|
|
||||||
transactionType=transactionType,
|
|
||||||
parsedFloData=parsedFloData
|
|
||||||
))
|
|
||||||
session.commit()
|
|
||||||
session.close()
|
|
||||||
|
|
||||||
|
|
||||||
def add_contract_transaction_history(contract_name, contract_address, transactionType, transactionSubType, sourceFloAddress, destFloAddress, transferAmount, blockNumber, blockHash, blocktime, transactionHash, jsonData, parsedFloData):
|
|
||||||
session = create_database_session_orm('smart_contract', {'contract_name': f"{contract_name}", 'contract_address': f"{contract_address}"}, ContractBase)
|
|
||||||
blockchainReference = neturl + 'tx/' + transactionHash
|
|
||||||
session.add(ContractTransactionHistory(transactionType=transactionType,
|
|
||||||
sourceFloAddress=sourceFloAddress,
|
|
||||||
destFloAddress=destFloAddress,
|
destFloAddress=destFloAddress,
|
||||||
transferAmount=transferAmount,
|
transferAmount=transferAmount,
|
||||||
blockNumber=blockNumber,
|
blockNumber=blockNumber,
|
||||||
@ -282,52 +284,93 @@ def add_contract_transaction_history(contract_name, contract_address, transactio
|
|||||||
transactionHash=transactionHash,
|
transactionHash=transactionHash,
|
||||||
blockchainReference=blockchainReference,
|
blockchainReference=blockchainReference,
|
||||||
jsonData=jsonData,
|
jsonData=jsonData,
|
||||||
|
transactionType=transactionType,
|
||||||
parsedFloData=parsedFloData
|
parsedFloData=parsedFloData
|
||||||
))
|
))
|
||||||
session.commit()
|
session.commit()
|
||||||
session.close()
|
session.close()
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
logger.info(f"Unable to connect to 'token({token_name})' database... retrying in {DB_RETRY_TIMEOUT} seconds")
|
||||||
|
time.sleep(DB_RETRY_TIMEOUT)
|
||||||
|
|
||||||
|
|
||||||
|
def add_contract_transaction_history(contract_name, contract_address, transactionType, transactionSubType, sourceFloAddress, destFloAddress, transferAmount, blockNumber, blockHash, blocktime, transactionHash, jsonData, parsedFloData):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
session = create_database_session_orm('smart_contract', {'contract_name': f"{contract_name}", 'contract_address': f"{contract_address}"}, ContractBase)
|
||||||
|
blockchainReference = neturl + 'tx/' + transactionHash
|
||||||
|
session.add(ContractTransactionHistory(transactionType=transactionType,
|
||||||
|
sourceFloAddress=sourceFloAddress,
|
||||||
|
destFloAddress=destFloAddress,
|
||||||
|
transferAmount=transferAmount,
|
||||||
|
blockNumber=blockNumber,
|
||||||
|
blockHash=blockHash,
|
||||||
|
time=blocktime,
|
||||||
|
transactionHash=transactionHash,
|
||||||
|
blockchainReference=blockchainReference,
|
||||||
|
jsonData=jsonData,
|
||||||
|
parsedFloData=parsedFloData
|
||||||
|
))
|
||||||
|
session.commit()
|
||||||
|
session.close()
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
logger.info(f"Unable to connect to 'smart_contract({contract_name})' database... retrying in {DB_RETRY_TIMEOUT} seconds")
|
||||||
|
time.sleep(DB_RETRY_TIMEOUT)
|
||||||
|
|
||||||
|
|
||||||
def rejected_transaction_history(transaction_data, parsed_data, sourceFloAddress, destFloAddress, rejectComment):
|
def rejected_transaction_history(transaction_data, parsed_data, sourceFloAddress, destFloAddress, rejectComment):
|
||||||
session = create_database_session_orm('system_dbs', {'db_name': "system"}, TokenBase)
|
while True:
|
||||||
blockchainReference = neturl + 'tx/' + transaction_data['txid']
|
try:
|
||||||
session.add(RejectedTransactionHistory(tokenIdentification=parsed_data['tokenIdentification'],
|
session = create_database_session_orm('system_dbs', {'db_name': "system"}, TokenBase)
|
||||||
sourceFloAddress=sourceFloAddress, destFloAddress=destFloAddress,
|
blockchainReference = neturl + 'tx/' + transaction_data['txid']
|
||||||
transferAmount=parsed_data['tokenAmount'],
|
session.add(RejectedTransactionHistory(tokenIdentification=parsed_data['tokenIdentification'],
|
||||||
blockNumber=transaction_data['blockheight'],
|
sourceFloAddress=sourceFloAddress, destFloAddress=destFloAddress,
|
||||||
blockHash=transaction_data['blockhash'],
|
transferAmount=parsed_data['tokenAmount'],
|
||||||
time=transaction_data['time'],
|
blockNumber=transaction_data['blockheight'],
|
||||||
transactionHash=transaction_data['txid'],
|
blockHash=transaction_data['blockhash'],
|
||||||
blockchainReference=blockchainReference,
|
time=transaction_data['time'],
|
||||||
jsonData=json.dumps(transaction_data),
|
transactionHash=transaction_data['txid'],
|
||||||
rejectComment=rejectComment,
|
blockchainReference=blockchainReference,
|
||||||
transactionType=parsed_data['type'],
|
jsonData=json.dumps(transaction_data),
|
||||||
parsedFloData=json.dumps(parsed_data)
|
rejectComment=rejectComment,
|
||||||
))
|
transactionType=parsed_data['type'],
|
||||||
session.commit()
|
parsedFloData=json.dumps(parsed_data)
|
||||||
session.close()
|
))
|
||||||
|
session.commit()
|
||||||
|
session.close()
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds")
|
||||||
|
time.sleep(DB_RETRY_TIMEOUT)
|
||||||
|
|
||||||
|
|
||||||
def rejected_contract_transaction_history(transaction_data, parsed_data, transactionType, contractAddress, sourceFloAddress, destFloAddress, rejectComment):
|
def rejected_contract_transaction_history(transaction_data, parsed_data, transactionType, contractAddress, sourceFloAddress, destFloAddress, rejectComment):
|
||||||
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
|
while True:
|
||||||
blockchainReference = neturl + 'tx/' + transaction_data['txid']
|
try:
|
||||||
session.add(RejectedContractTransactionHistory(transactionType=transactionType,
|
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
|
||||||
contractName=parsed_data['contractName'],
|
blockchainReference = neturl + 'tx/' + transaction_data['txid']
|
||||||
contractAddress=contractAddress,
|
session.add(RejectedContractTransactionHistory(transactionType=transactionType,
|
||||||
sourceFloAddress=sourceFloAddress,
|
contractName=parsed_data['contractName'],
|
||||||
destFloAddress=destFloAddress,
|
contractAddress=contractAddress,
|
||||||
transferAmount=None,
|
sourceFloAddress=sourceFloAddress,
|
||||||
blockNumber=transaction_data['blockheight'],
|
destFloAddress=destFloAddress,
|
||||||
blockHash=transaction_data['blockhash'],
|
transferAmount=None,
|
||||||
time=transaction_data['time'],
|
blockNumber=transaction_data['blockheight'],
|
||||||
transactionHash=transaction_data['txid'],
|
blockHash=transaction_data['blockhash'],
|
||||||
blockchainReference=blockchainReference,
|
time=transaction_data['time'],
|
||||||
jsonData=json.dumps(transaction_data),
|
transactionHash=transaction_data['txid'],
|
||||||
rejectComment=rejectComment,
|
blockchainReference=blockchainReference,
|
||||||
parsedFloData=json.dumps(parsed_data)))
|
jsonData=json.dumps(transaction_data),
|
||||||
session.commit()
|
rejectComment=rejectComment,
|
||||||
session.close()
|
parsedFloData=json.dumps(parsed_data)))
|
||||||
|
session.commit()
|
||||||
|
session.close()
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds")
|
||||||
|
time.sleep(DB_RETRY_TIMEOUT)
|
||||||
|
|
||||||
def convert_datetime_to_arrowobject(expiryTime):
|
def convert_datetime_to_arrowobject(expiryTime):
|
||||||
expirytime_split = expiryTime.split(' ')
|
expirytime_split = expiryTime.split(' ')
|
||||||
@ -348,19 +391,25 @@ def convert_datetime_to_arrowobject_regex(expiryTime):
|
|||||||
|
|
||||||
|
|
||||||
def is_a_contract_address(floAddress):
|
def is_a_contract_address(floAddress):
|
||||||
# check contract address mapping db if the address is present, and return True or False based on that
|
while True:
|
||||||
system_db = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase)
|
try:
|
||||||
|
# check contract address mapping db if the address is present, and return True or False based on that
|
||||||
|
session = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase)
|
||||||
|
|
||||||
# contract_number = system_db.query(func.sum(ContractAddressMapping.contractAddress)).filter(ContractAddressMapping.contractAddress == floAddress).all()[0][0]
|
# contract_number = session.query(func.sum(ContractAddressMapping.contractAddress)).filter(ContractAddressMapping.contractAddress == floAddress).all()[0][0]
|
||||||
query_data = system_db.query(ContractAddressMapping.contractAddress).filter(ContractAddressMapping.contractAddress == floAddress).all()
|
query_data = session.query(ContractAddressMapping.contractAddress).filter(ContractAddressMapping.contractAddress == floAddress).all()
|
||||||
contract_number = sum(Decimal(f"{amount[0]}") if amount[0] is not None else Decimal(0) for amount in query_data)
|
contract_number = sum(Decimal(f"{amount[0]}") if amount[0] is not None else Decimal(0) for amount in query_data)
|
||||||
|
session.close()
|
||||||
if contract_number is None or contract_number==0:
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
if contract_number is None or contract_number==0:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
except:
|
||||||
|
logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds")
|
||||||
|
time.sleep(DB_RETRY_TIMEOUT)
|
||||||
|
|
||||||
|
""" ? NOT USED ?
|
||||||
def fetchDynamicSwapPrice_old(contractStructure, transaction_data, blockinfo):
|
def fetchDynamicSwapPrice_old(contractStructure, transaction_data, blockinfo):
|
||||||
oracle_address = contractStructure['oracle_address']
|
oracle_address = contractStructure['oracle_address']
|
||||||
# fetch transactions from the blockchain where from address : oracle-address... to address: contract address
|
# fetch transactions from the blockchain where from address : oracle-address... to address: contract address
|
||||||
@ -402,7 +451,7 @@ def fetchDynamicSwapPrice_old(contractStructure, transaction_data, blockinfo):
|
|||||||
else:
|
else:
|
||||||
logger.info('API error fetchDynamicSwapPrice')
|
logger.info('API error fetchDynamicSwapPrice')
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
"""
|
||||||
|
|
||||||
def fetchDynamicSwapPrice(contractStructure, blockinfo):
|
def fetchDynamicSwapPrice(contractStructure, blockinfo):
|
||||||
oracle_address = contractStructure['oracle_address']
|
oracle_address = contractStructure['oracle_address']
|
||||||
@ -413,39 +462,46 @@ def fetchDynamicSwapPrice(contractStructure, blockinfo):
|
|||||||
latest_param = 'true'
|
latest_param = 'true'
|
||||||
mempool_param = 'false'
|
mempool_param = 'false'
|
||||||
init_id = None
|
init_id = None
|
||||||
response = requests.get(f'{api_url}api/v1/address/{oracle_address}?details=txs', verify=API_VERIFY)
|
while True:
|
||||||
if response.status_code == 200:
|
try:
|
||||||
response = response.json()
|
response = requests.get(f'{api_url}api/v1/address/{oracle_address}?details=txs', verify=API_VERIFY)
|
||||||
if len(response['txs']) == 0:
|
if response.status_code == 200:
|
||||||
return float(contractStructure['price'])
|
response = response.json()
|
||||||
else:
|
if len(response['txs']) == 0:
|
||||||
for transaction in response['txs']:
|
return float(contractStructure['price'])
|
||||||
if 'floData' in transaction.keys():
|
|
||||||
floData = transaction['floData']
|
|
||||||
else:
|
else:
|
||||||
floData = ''
|
for transaction in response['txs']:
|
||||||
# If the blocktime of the transaction is < than the current block time
|
if 'floData' in transaction.keys():
|
||||||
if transaction['time'] < blockinfo['time']:
|
floData = transaction['floData']
|
||||||
# Check if flodata is in the format we are looking for
|
else:
|
||||||
# ie. {"price-update":{"contract-name": "", "contract-address": "", "price": 3}}
|
floData = ''
|
||||||
# and receiver address should be contractAddress
|
# If the blocktime of the transaction is < than the current block time
|
||||||
try:
|
if transaction['time'] < blockinfo['time']:
|
||||||
sender_address, receiver_address = find_sender_receiver(transaction)
|
# Check if flodata is in the format we are looking for
|
||||||
assert sender_address == oracle_address
|
# ie. {"price-update":{"contract-name": "", "contract-address": "", "price": 3}}
|
||||||
assert receiver_address == contractStructure['contractAddress']
|
# and receiver address should be contractAddress
|
||||||
floData = json.loads(floData)
|
try:
|
||||||
# Check if the contract name and address are right
|
sender_address, receiver_address = find_sender_receiver(transaction)
|
||||||
assert floData['price-update']['contract-name'] == contractStructure['contractName']
|
assert sender_address == oracle_address
|
||||||
assert floData['price-update']['contract-address'] == contractStructure['contractAddress']
|
assert receiver_address == contractStructure['contractAddress']
|
||||||
return float(floData['price-update']['price'])
|
floData = json.loads(floData)
|
||||||
except:
|
# Check if the contract name and address are right
|
||||||
continue
|
assert floData['price-update']['contract-name'] == contractStructure['contractName']
|
||||||
else:
|
assert floData['price-update']['contract-address'] == contractStructure['contractAddress']
|
||||||
continue
|
return float(floData['price-update']['price'])
|
||||||
else:
|
except:
|
||||||
logger.info('API error fetchDynamicSwapPrice')
|
continue
|
||||||
sys.exit(0)
|
else:
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
logger.info(f'API error fetchDynamicSwapPrice. Retry in {RETRY_TIMEOUT_LONG}s...')
|
||||||
|
#sys.exit(0)
|
||||||
|
time.sleep(RETRY_TIMEOUT_LONG)
|
||||||
|
except:
|
||||||
|
logger.info(f'API error fetchDynamicSwapPrice. Retry in {RETRY_TIMEOUT_LONG}s...')
|
||||||
|
time.sleep(RETRY_TIMEOUT_LONG)
|
||||||
|
|
||||||
return float(contractStructure['price'])
|
return float(contractStructure['price'])
|
||||||
|
|
||||||
|
|
||||||
@ -461,7 +517,9 @@ def processBlock(blockindex=None, blockhash=None):
|
|||||||
logger.info(f"API call block-index/{blockindex} failed to give proper response. Retrying.")
|
logger.info(f"API call block-index/{blockindex} failed to give proper response. Retrying.")
|
||||||
|
|
||||||
blockinfo = newMultiRequest(f"block/{blockhash}")
|
blockinfo = newMultiRequest(f"block/{blockhash}")
|
||||||
|
|
||||||
|
#TODO: Check for reorg in here
|
||||||
|
|
||||||
# Check and perform operations which do not require blockchain intervention
|
# Check and perform operations which do not require blockchain intervention
|
||||||
checkLocal_expiry_trigger_deposit(blockinfo)
|
checkLocal_expiry_trigger_deposit(blockinfo)
|
||||||
|
|
||||||
@ -501,17 +559,26 @@ def processBlock(blockindex=None, blockhash=None):
|
|||||||
tempinfo.remove(tx)
|
tempinfo.remove(tx)
|
||||||
blockinfo['txs'] = tempinfo
|
blockinfo['txs'] = tempinfo
|
||||||
updateLatestBlock(blockinfo)
|
updateLatestBlock(blockinfo)
|
||||||
|
|
||||||
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
|
try:
|
||||||
entry = session.query(SystemData).filter(SystemData.attribute == 'lastblockscanned').all()[0]
|
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
|
||||||
entry.value = str(blockinfo['height'])
|
entry = session.query(SystemData).filter(SystemData.attribute == 'lastblockscanned').all()[0]
|
||||||
session.commit()
|
entry.value = str(blockinfo['height'])
|
||||||
session.close()
|
session.commit()
|
||||||
|
session.close()
|
||||||
|
except:
|
||||||
|
logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds")
|
||||||
|
time.sleep(DB_RETRY_TIMEOUT)
|
||||||
|
|
||||||
|
|
||||||
def updateLatestTransaction(transactionData, parsed_data, db_reference, transactionType=None ):
|
def updateLatestTransaction(transactionData, parsed_data, db_reference, transactionType=None ):
|
||||||
# connect to latest transaction db
|
# connect to latest transaction db
|
||||||
conn = create_database_connection('latest_cache', {'db_name':"latestCache"})
|
while True:
|
||||||
|
try:
|
||||||
|
conn = create_database_connection('latest_cache', {'db_name':"latestCache"})
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
time.sleep(DB_RETRY_TIMEOUT)
|
||||||
if transactionType is None:
|
if transactionType is None:
|
||||||
transactionType = parsed_data['type']
|
transactionType = parsed_data['type']
|
||||||
conn.execute("INSERT INTO latestTransactions(transactionHash, blockNumber, jsonData, transactionType, parsedFloData, db_reference) VALUES (?,?,?,?,?,?)", (transactionData['txid'], transactionData['blockheight'], json.dumps(transactionData), transactionType, json.dumps(parsed_data), db_reference))
|
conn.execute("INSERT INTO latestTransactions(transactionHash, blockNumber, jsonData, transactionType, parsedFloData, db_reference) VALUES (?,?,?,?,?,?)", (transactionData['txid'], transactionData['blockheight'], json.dumps(transactionData), transactionType, json.dumps(parsed_data), db_reference))
|
||||||
@ -521,7 +588,12 @@ def updateLatestTransaction(transactionData, parsed_data, db_reference, transact
|
|||||||
|
|
||||||
def updateLatestBlock(blockData):
|
def updateLatestBlock(blockData):
|
||||||
# connect to latest block db
|
# connect to latest block db
|
||||||
conn = create_database_connection('latest_cache', {'db_name':"latestCache"})
|
while True:
|
||||||
|
try:
|
||||||
|
conn = create_database_connection('latest_cache', {'db_name':"latestCache"})
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
time.sleep(DB_RETRY_TIMEOUT)
|
||||||
conn.execute('INSERT INTO latestBlocks(blockNumber, blockHash, jsonData) VALUES (?,?,?)', (blockData['height'], blockData['hash'], json.dumps(blockData)))
|
conn.execute('INSERT INTO latestBlocks(blockNumber, blockHash, jsonData) VALUES (?,?,?)', (blockData['height'], blockData['hash'], json.dumps(blockData)))
|
||||||
#conn.commit()
|
#conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
@ -550,7 +622,13 @@ def transferToken(tokenIdentification, tokenAmount, inputAddress, outputAddress,
|
|||||||
except:
|
except:
|
||||||
logger.info("This is a critical error. Please report to developers")
|
logger.info("This is a critical error. Please report to developers")
|
||||||
|
|
||||||
session = create_database_session_orm('token', {'token_name': f"{tokenIdentification}"}, TokenBase)
|
while True:
|
||||||
|
try:
|
||||||
|
session = create_database_session_orm('token', {'token_name': f"{tokenIdentification}"}, TokenBase)
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
time.sleep(DB_RETRY_TIMEOUT)
|
||||||
|
|
||||||
tokenAmount = float(tokenAmount)
|
tokenAmount = float(tokenAmount)
|
||||||
if isInfiniteToken == True:
|
if isInfiniteToken == True:
|
||||||
# Make new entry
|
# Make new entry
|
||||||
@ -776,13 +854,23 @@ def check_contract_status(contractName, contractAddress):
|
|||||||
# Status of the contract is at 2 tables in system.db
|
# Status of the contract is at 2 tables in system.db
|
||||||
# activecontracts and time_actions
|
# activecontracts and time_actions
|
||||||
# select the last entry form the colum
|
# select the last entry form the colum
|
||||||
connection = create_database_connection('system_dbs')
|
while True:
|
||||||
|
try:
|
||||||
|
connection = create_database_connection('system_dbs')
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
time.sleep(DB_RETRY_TIMEOUT)
|
||||||
contract_status = connection.execute(f'SELECT status FROM time_actions WHERE id=(SELECT MAX(id) FROM time_actions WHERE contractName="{contractName}" AND contractAddress="{contractAddress}")').fetchall()
|
contract_status = connection.execute(f'SELECT status FROM time_actions WHERE id=(SELECT MAX(id) FROM time_actions WHERE contractName="{contractName}" AND contractAddress="{contractAddress}")').fetchall()
|
||||||
return contract_status[0][0]
|
return contract_status[0][0]
|
||||||
|
|
||||||
|
|
||||||
def close_expire_contract(contractStructure, contractStatus, transactionHash, blockNumber, blockHash, incorporationDate, expiryDate, closeDate, trigger_time, trigger_activity, contractName, contractAddress, contractType, tokens_db, parsed_data, blockHeight):
|
def close_expire_contract(contractStructure, contractStatus, transactionHash, blockNumber, blockHash, incorporationDate, expiryDate, closeDate, trigger_time, trigger_activity, contractName, contractAddress, contractType, tokens_db, parsed_data, blockHeight):
|
||||||
connection = create_database_connection('system_dbs', {'db_name':'system'})
|
while True:
|
||||||
|
try:
|
||||||
|
connection = create_database_connection('system_dbs', {'db_name':'system'})
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
time.sleep(DB_RETRY_TIMEOUT)
|
||||||
connection.execute('INSERT INTO activecontracts VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (None, contractStructure['contractName'], contractStructure['contractAddress'], contractStatus, contractStructure['tokenIdentification'], contractStructure['contractType'], transactionHash, blockNumber, blockHash, incorporationDate, expiryDate, closeDate))
|
connection.execute('INSERT INTO activecontracts VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (None, contractStructure['contractName'], contractStructure['contractAddress'], contractStatus, contractStructure['tokenIdentification'], contractStructure['contractType'], transactionHash, blockNumber, blockHash, incorporationDate, expiryDate, closeDate))
|
||||||
connection.execute('INSERT INTO time_actions VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (None, trigger_time, trigger_activity, contractStatus, contractName, contractAddress, contractType, tokens_db, parsed_data, transactionHash, blockHeight))
|
connection.execute('INSERT INTO time_actions VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (None, trigger_time, trigger_activity, contractStatus, contractName, contractAddress, contractType, tokens_db, parsed_data, transactionHash, blockHeight))
|
||||||
connection.close()
|
connection.close()
|
||||||
@ -797,13 +885,19 @@ def return_active_deposits(session):
|
|||||||
# find all the deposits which are active
|
# find all the deposits which are active
|
||||||
# todo - sqlalchemy gives me warning with the following method
|
# todo - sqlalchemy gives me warning with the following method
|
||||||
subquery_filter = session.query(TimeActions.id).group_by(TimeActions.transactionHash).having(func.count(TimeActions.transactionHash)==1).subquery()
|
subquery_filter = session.query(TimeActions.id).group_by(TimeActions.transactionHash).having(func.count(TimeActions.transactionHash)==1).subquery()
|
||||||
active_deposits = session.query(TimeActions).filter(TimeActions.id.in_(subquery_filter), TimeActions.status=='active', TimeActions.activity=='contract-deposit').all()
|
active_deposits = session.query(TimeActions).filter(TimeActions.id.in_(subquery_filter.select()), TimeActions.status=='active', TimeActions.activity=='contract-deposit').all()
|
||||||
return active_deposits
|
return active_deposits
|
||||||
|
|
||||||
|
|
||||||
def checkLocal_expiry_trigger_deposit(blockinfo):
|
def checkLocal_expiry_trigger_deposit(blockinfo):
|
||||||
# Connect to system.db with a session
|
# Connect to system.db with a session
|
||||||
systemdb_session = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase)
|
while True:
|
||||||
|
try:
|
||||||
|
systemdb_session = create_database_session_orm('system_dbs', {'db_name':'system'}, SystemBase)
|
||||||
|
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
time.sleep(DB_RETRY_TIMEOUT)
|
||||||
timeactions_tx_hashes = []
|
timeactions_tx_hashes = []
|
||||||
active_contracts = return_active_contracts(systemdb_session)
|
active_contracts = return_active_contracts(systemdb_session)
|
||||||
active_deposits = return_active_deposits(systemdb_session)
|
active_deposits = return_active_deposits(systemdb_session)
|
||||||
@ -820,15 +914,23 @@ def checkLocal_expiry_trigger_deposit(blockinfo):
|
|||||||
tx_type = 'trigger'
|
tx_type = 'trigger'
|
||||||
data = [blockinfo['hash'], blockinfo['height'], blockinfo['time'], blockinfo['size'], tx_type]
|
data = [blockinfo['hash'], blockinfo['height'], blockinfo['time'], blockinfo['size'], tx_type]
|
||||||
|
|
||||||
response = requests.get(f'https://stdops.ranchimall.net/hash?data={data}', verify=API_VERIFY)
|
def _get_txid():
|
||||||
if response.status_code == 200:
|
while True:
|
||||||
txid = response.json()
|
try:
|
||||||
elif response.status_code == 404:
|
response = requests.get(f'https://stdops.ranchimall.net/hash?data={data}', verify=API_VERIFY)
|
||||||
logger.info('Internal trigger has failed')
|
if response.status_code == 200:
|
||||||
sys.exit(0)
|
txid = response.json()
|
||||||
|
return txid
|
||||||
|
elif response.status_code == 404:
|
||||||
|
logger.info(f'Internal trigger has failed (404) for getting txid from stdops.ranchimall.net. Retry in {RETRY_TIMEOUT_LONG}s')
|
||||||
|
time.sleep(RETRY_TIMEOUT_LONG)
|
||||||
|
except:
|
||||||
|
logger.info(f'Internal trigger has failed for getting txid from stdops.ranchimall.net. Retry in {RETRY_TIMEOUT_LONG}s')
|
||||||
|
time.sleep(RETRY_TIMEOUT_LONG)
|
||||||
|
|
||||||
|
|
||||||
transaction_data = {}
|
transaction_data = {}
|
||||||
transaction_data['txid'] = txid
|
transaction_data['txid'] = _get_txid()
|
||||||
transaction_data['blockheight'] = blockinfo['height']
|
transaction_data['blockheight'] = blockinfo['height']
|
||||||
transaction_data['time'] = blockinfo['time']
|
transaction_data['time'] = blockinfo['time']
|
||||||
|
|
||||||
@ -954,8 +1056,47 @@ def checkLocal_expiry_trigger_deposit(blockinfo):
|
|||||||
updateLatestTransaction(transaction_data, parsed_data, f"{query.contractName}-{query.contractAddress}")
|
updateLatestTransaction(transaction_data, parsed_data, f"{query.contractName}-{query.contractAddress}")
|
||||||
|
|
||||||
|
|
||||||
|
def check_reorg():
|
||||||
|
connection = create_database_connection('system_dbs')
|
||||||
|
blockbook_api_url = 'https://blockbook.ranchimall.net/'
|
||||||
|
BACK_TRACK_BLOCKS = 1000
|
||||||
|
|
||||||
|
# find latest block number in local database
|
||||||
|
latest_block = list(connection.execute("SELECT max(blockNumber) from latestBlocks").fetchone())[0]
|
||||||
|
block_number = latest_block
|
||||||
|
|
||||||
|
while block_number > 0:
|
||||||
|
# get the block hash
|
||||||
|
block_hash = list(connection.execute(f"SELECT blockHash from latestBlocks WHERE blockNumber = {block_number}").fetchone())[0]
|
||||||
|
|
||||||
|
# Check if the block is in blockbook (i.e, not dropped in reorg)
|
||||||
|
response = requests.get(f'{blockbook_api_url}api/block/{block_number}', verify=API_VERIFY)
|
||||||
|
if response.status_code == 200:
|
||||||
|
response = response.json()
|
||||||
|
if response['hash'] == block_hash: # local blockhash matches with blockbook hash
|
||||||
|
break
|
||||||
|
else: # check for older blocks to trace where reorg has happened
|
||||||
|
block_number -= BACK_TRACK_BLOCKS
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
logger.info('Response from the Blockbook API failed')
|
||||||
|
sys.exit(0) #TODO test reorg fix and remove this
|
||||||
|
|
||||||
|
connection.close()
|
||||||
|
|
||||||
|
# rollback if needed
|
||||||
|
if block_number != latest_block:
|
||||||
|
rollback_to_block(block_number)
|
||||||
|
|
||||||
|
return block_number
|
||||||
|
|
||||||
def extract_contractStructure(contractName, contractAddress):
|
def extract_contractStructure(contractName, contractAddress):
|
||||||
connection = create_database_connection('smart_contract', {'contract_name':f"{contractName}", 'contract_address':f"{contractAddress}"})
|
while True:
|
||||||
|
try:
|
||||||
|
connection = create_database_connection('smart_contract', {'contract_name':f"{contractName}", 'contract_address':f"{contractAddress}"})
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
time.sleep(DB_RETRY_TIMEOUT)
|
||||||
attributevaluepair = connection.execute("SELECT attribute, value FROM contractstructure WHERE attribute != 'flodata'").fetchall()
|
attributevaluepair = connection.execute("SELECT attribute, value FROM contractstructure WHERE attribute != 'flodata'").fetchall()
|
||||||
contractStructure = {}
|
contractStructure = {}
|
||||||
conditionDict = {}
|
conditionDict = {}
|
||||||
@ -2311,10 +2452,16 @@ def processTransaction(transaction_data, parsed_data, blockinfo):
|
|||||||
|
|
||||||
def scanBlockchain():
|
def scanBlockchain():
|
||||||
# Read start block no
|
# Read start block no
|
||||||
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
|
while True:
|
||||||
startblock = int(session.query(SystemData).filter_by(attribute='lastblockscanned').all()[0].value) + 1
|
try:
|
||||||
session.commit()
|
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
|
||||||
session.close()
|
startblock = int(session.query(SystemData).filter_by(attribute='lastblockscanned').all()[0].value) + 1
|
||||||
|
session.commit()
|
||||||
|
session.close()
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds")
|
||||||
|
time.sleep(DB_RETRY_TIMEOUT)
|
||||||
|
|
||||||
# todo Rule 6 - Find current block height
|
# todo Rule 6 - Find current block height
|
||||||
# Rule 7 - Start analysing the block contents from starting block to current height
|
# Rule 7 - Start analysing the block contents from starting block to current height
|
||||||
@ -2483,36 +2630,59 @@ IGNORE_BLOCK_LIST = [int(s) for s in IGNORE_BLOCK_LIST]
|
|||||||
IGNORE_TRANSACTION_LIST = config['DEFAULT']['IGNORE_TRANSACTION_LIST'].split(',')
|
IGNORE_TRANSACTION_LIST = config['DEFAULT']['IGNORE_TRANSACTION_LIST'].split(',')
|
||||||
|
|
||||||
|
|
||||||
# Delete database and smartcontract directory if reset is set to 1
|
def create_dir_if_not_exist(dir_path, reset = False):
|
||||||
if args.reset == 1:
|
if os.path.exists(dir_path):
|
||||||
logger.info("Resetting the database. ")
|
if reset:
|
||||||
dirpath = os.path.join(config['DEFAULT']['DATA_PATH'], 'tokens')
|
shutil.rmtree(dir_path)
|
||||||
if os.path.exists(dirpath):
|
os.mkdir(dir_path)
|
||||||
shutil.rmtree(dirpath)
|
else:
|
||||||
os.mkdir(dirpath)
|
os.mkdir(dir_path)
|
||||||
dirpath = os.path.join(config['DEFAULT']['DATA_PATH'], 'smartContracts')
|
|
||||||
if os.path.exists(dirpath):
|
|
||||||
shutil.rmtree(dirpath)
|
|
||||||
os.mkdir(dirpath)
|
|
||||||
dirpath = os.path.join(config['DEFAULT']['DATA_PATH'], 'system.db')
|
|
||||||
if os.path.exists(dirpath):
|
|
||||||
os.remove(dirpath)
|
|
||||||
dirpath = os.path.join(config['DEFAULT']['DATA_PATH'], 'latestCache.db')
|
|
||||||
if os.path.exists(dirpath):
|
|
||||||
os.remove(dirpath)
|
|
||||||
|
|
||||||
# Read start block no
|
def init_system_db(startblock):
|
||||||
startblock = int(config['DEFAULT']['START_BLOCK'])
|
# Initialize system.db
|
||||||
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
|
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
|
||||||
session.add(SystemData(attribute='lastblockscanned', value=startblock - 1))
|
session.add(SystemData(attribute='lastblockscanned', value=startblock - 1))
|
||||||
session.commit()
|
session.commit()
|
||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
|
def init_lastestcache_db():
|
||||||
# Initialize latest cache DB
|
# Initialize latest cache DB
|
||||||
session = create_database_session_orm('system_dbs', {'db_name': "latestCache"}, LatestCacheBase)
|
session = create_database_session_orm('system_dbs', {'db_name': "latestCache"}, LatestCacheBase)
|
||||||
session.commit()
|
session.commit()
|
||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
|
def init_storage_if_not_exist(reset = False):
|
||||||
|
|
||||||
|
token_dir_path = os.path.join(config['DEFAULT']['DATA_PATH'], 'tokens')
|
||||||
|
create_dir_if_not_exist(token_dir_path, reset)
|
||||||
|
|
||||||
|
smart_contract_dir_path = os.path.join(config['DEFAULT']['DATA_PATH'], 'smartContracts')
|
||||||
|
create_dir_if_not_exist(smart_contract_dir_path, reset)
|
||||||
|
|
||||||
|
system_db_path = os.path.join(config['DEFAULT']['DATA_PATH'], 'system.db')
|
||||||
|
if os.path.exists(system_db_path):
|
||||||
|
if reset:
|
||||||
|
os.remove(system_db_path)
|
||||||
|
init_system_db(int(config['DEFAULT']['START_BLOCK']))
|
||||||
|
else:
|
||||||
|
init_system_db(int(config['DEFAULT']['START_BLOCK']))
|
||||||
|
|
||||||
|
|
||||||
|
latestCache_db_path = os.path.join(config['DEFAULT']['DATA_PATH'], 'latestCache.db')
|
||||||
|
if os.path.exists(latestCache_db_path):
|
||||||
|
if reset:
|
||||||
|
os.remove(latestCache_db_path)
|
||||||
|
init_lastestcache_db()
|
||||||
|
else:
|
||||||
|
init_lastestcache_db()
|
||||||
|
|
||||||
|
# Delete database and smartcontract directory if reset is set to 1
|
||||||
|
if args.reset == 1:
|
||||||
|
logger.info("Resetting the database. ")
|
||||||
|
init_storage_if_not_exist(reset=True)
|
||||||
|
else:
|
||||||
|
init_storage_if_not_exist()
|
||||||
|
|
||||||
|
|
||||||
# Determine API source for block and transaction information
|
# Determine API source for block and transaction information
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
@ -2526,4 +2696,4 @@ if __name__ == "__main__":
|
|||||||
# Now we connect to flosight's websocket API to get information about the latest blocks
|
# Now we connect to flosight's websocket API to get information about the latest blocks
|
||||||
# Neturl is the URL for Flosight API whose websocket endpoint is being connected to
|
# Neturl is the URL for Flosight API whose websocket endpoint is being connected to
|
||||||
|
|
||||||
asyncio.get_event_loop().run_until_complete(connect_to_websocket(websocket_uri))
|
asyncio.get_event_loop().run_until_complete(connect_to_websocket(websocket_uri))
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user