Further changes for blockbook shifting:

1. Removed socketio
2. Added support for blockbook's websocket
3. Fixed a bug reg transaction processing. Blockbook doesn't return floData key in a transaction's details API response if floData is not present in the transaction. Updated code to handle this
This commit is contained in:
RanchiMall Dev 2023-10-27 11:13:09 +00:00
parent bb77a9723b
commit a83ed33a99
2 changed files with 60 additions and 87 deletions

View File

@ -8,7 +8,7 @@ idna==2.10
pycparser==2.20
python-dateutil==2.8.1
python-engineio==3.14.2
python-socketio==4.6.1
websockets==12.0
requests==2.25.0
six==1.16.0
SQLAlchemy==1.4.18

View File

@ -1,13 +1,12 @@
import argparse
import configparser
import json
import json
import logging
import os
import shutil
import sys
import pyflo
import requests
import socketio
from sqlalchemy import create_engine, func, and_
from sqlalchemy.orm import sessionmaker
import time
@ -18,6 +17,8 @@ from datetime import datetime
from ast import literal_eval
from models import SystemData, TokenBase, ActiveTable, ConsumedTable, TransferLogs, TransactionHistory, TokenContractAssociation, ContractBase, ContractStructure, ContractParticipants, ContractTransactionHistory, ContractDeposits, ConsumedInfo, ContractWinners, ContinuosContractBase, ContractStructure2, ContractParticipants2, ContractDeposits2, ContractTransactionHistory2, SystemBase, ActiveContracts, SystemData, ContractAddressMapping, TokenAddressMapping, DatabaseTypeMapping, TimeActions, RejectedContractTransactionHistory, RejectedTransactionHistory, LatestCacheBase, LatestTransactions, LatestBlocks
from statef_processing import process_stateF
import asyncio
import websockets
import pdb
@ -55,7 +56,7 @@ def process_committee_flodata(flodata):
try:
contract_committee_actions = flodata['token-tracker']['contract-committee']
except KeyError:
print('Flodata related to contract committee')
logger.info('Flodata related to contract committee')
else:
# Adding first and removing later to maintain consistency and not to depend on floData for order of execution
for action in contract_committee_actions.keys():
@ -76,7 +77,7 @@ def refresh_committee_list_old(admin_flo_id, api_url, blocktime):
if response.status_code == 200:
response = response.json()
else:
print('Response from the Flosight API failed')
logger.info('Response from the Flosight API failed')
sys.exit(0)
committee_list = []
@ -114,7 +115,7 @@ def refresh_committee_list(admin_flo_id, api_url, blocktime):
if response.status_code == 200:
return response.json()
else:
print('Response from the Flosight API failed')
logger.info('Response from the Flosight API failed')
sys.exit(0)
url = f'{api_url}api/v1/address/{admin_flo_id}?details=txs'
@ -153,7 +154,7 @@ def find_sender_receiver(transaction_data):
temp = item[0]
continue
if item[0] != temp:
print(f"System has found more than one address as part of vin. Transaction {transaction_data['txid']} is rejected")
logger.info(f"System has found more than one address as part of vin. Transaction {transaction_data['txid']} is rejected")
return 0
inputlist = [vinlist[0][0], totalinputval]
@ -161,7 +162,7 @@ def find_sender_receiver(transaction_data):
# todo Rule 42 - If the number of vout is more than 2, reject the transaction
if len(transaction_data["vout"]) > 2:
print(f"System has found more than 2 address as part of vout. Transaction {transaction_data['txid']} is rejected")
logger.info(f"System has found more than 2 address as part of vout. Transaction {transaction_data['txid']} is rejected")
return 0
# todo Rule 43 - A transaction accepted by the system has two vouts, 1. The FLO address of the receiver
@ -172,17 +173,16 @@ def find_sender_receiver(transaction_data):
addresscounter = 0
inputcounter = 0
for obj in transaction_data["vout"]:
if obj["scriptPubKey"]["type"] == "pubkeyhash":
addresscounter = addresscounter + 1
if inputlist[0] == obj["scriptPubKey"]["addresses"][0]:
inputcounter = inputcounter + 1
continue
outputlist.append([obj["scriptPubKey"]["addresses"][0], obj["value"]])
addresscounter = addresscounter + 1
if inputlist[0] == obj["scriptPubKey"]["addresses"][0]:
inputcounter = inputcounter + 1
continue
outputlist.append([obj["scriptPubKey"]["addresses"][0], obj["value"]])
if addresscounter == inputcounter:
outputlist = [inputlist[0]]
elif len(outputlist) != 1:
print(f"Transaction's change is not coming back to the input address. Transaction {transaction_data['txid']} is rejected")
logger.info(f"Transaction's change is not coming back to the input address. Transaction {transaction_data['txid']} is rejected")
return 0
else:
outputlist = outputlist[0]
@ -413,7 +413,10 @@ def fetchDynamicSwapPrice(contractStructure, blockinfo):
return float(contractStructure['price'])
else:
for transaction in response['txs']:
floData = transaction['floData']
if 'floData' in transaction.keys():
floData = transaction['floData']
else:
floData = ''
# If the blocktime of the transaction is < than the current block time
if transaction['time'] < blockinfo['time']:
# Check if flodata is in the format we are looking for
@ -436,38 +439,6 @@ def fetchDynamicSwapPrice(contractStructure, blockinfo):
logger.info('API error fetchDynamicSwapPrice')
sys.exit(0)
# Chain query
if 'incomplete' in response.keys():
is_incomplete_key_present = True
init_id = response['initItem']
while(is_incomplete_key_present == True):
response = requests.get(f'{api_url}api/v1/address/{oracle_address}?details=txs', verify=API_VERIFY)
if response.status_code == 200:
response = response.json()
for transaction in response['txs']:
floData = transaction['floData']
# If the blocktime of the transaction is < than the current block time
if transaction['time'] < blockinfo['time']:
# Check if flodata is in the format we are looking for
# ie. {"price-update":{"contract-name": "", "contract-address": "", "price": 3}}
# and receiver address should be contractAddress
try:
sender_address, receiver_address = find_sender_receiver(transaction)
assert sender_address == oracle_address
assert receiver_address == contractStructure['contractAddress']
floData = json.loads(floData)
# Check if the contract name and address are right
assert floData['price-update']['contract-name'] == contractStructure['contractName']
assert floData['price-update']['contract-address'] == contractStructure['contractAddress']
return float(floData['price-update']['price'])
except:
continue
else:
continue
else:
logger.info('API error fetchDynamicSwapPrice')
sys.exit(0)
return float(contractStructure['price'])
@ -492,8 +463,8 @@ def processBlock(blockindex=None, blockhash=None):
for transaction_data in blockinfo["txs"]:
transaction = transaction_data["txid"]
# if transaction == 'cd1176b2567ca2ae15624962008d3d935ebd36b99e419f5ad745dadd5858669f':
# pdb.set_trace()
if transaction in ['f09b63a9f9bc5412c2e339196994441f99cf46e6fa98a0656cba62d7f2cad9c8', '452f964d8923515c9f58b45a9bfdd1ac288c38f740149222b83989958e764d1e']:
pass
try:
text = transaction_data["floData"]
@ -513,6 +484,8 @@ def processBlock(blockindex=None, blockhash=None):
elif returnval == 0:
logger.info("Transfer for the transaction %s is illegitimate. Moving on" % transaction)
logger.info("Completed tx loop")
if len(acceptedTxList) > 0:
tempinfo = blockinfo['txs'].copy()
for tx in blockinfo['txs']:
@ -567,7 +540,7 @@ def transferToken(tokenIdentification, tokenAmount, inputAddress, outputAddress,
try:
transactionType=parsed_data['type']
except:
print("This is a critical error. Please report to developers")
logger.info("This is a critical error. Please report to developers")
session = create_database_session_orm('token', {'token_name': f"{tokenIdentification}"}, TokenBase)
tokenAmount = float(tokenAmount)
@ -878,7 +851,6 @@ def checkLocal_expiry_trigger_deposit(blockinfo):
tokenAmount_sum = connection.execute('SELECT IFNULL(sum(tokenAmount), 0) FROM contractparticipants').fetchall()[0][0]
if tokenAmount_sum >= maximumsubscriptionamount:
# Trigger the contract
# pdb.set_trace()
success_returnval = trigger_internal_contract(tokenAmount_sum, contractStructure, transaction_data, blockinfo, parsed_data, connection, contract_name=query.contractName, contract_address=query.contractAddress, transaction_subType='maximumsubscriptionamount')
if not success_returnval:
return 0
@ -894,7 +866,6 @@ def checkLocal_expiry_trigger_deposit(blockinfo):
# Trigger the contract
tokenAmount_sum = connection.execute('SELECT IFNULL(sum(tokenAmount), 0) FROM contractparticipants').fetchall()[0][0]
# pdb.set_trace()
success_returnval = trigger_internal_contract(tokenAmount_sum, contractStructure, transaction_data, blockinfo, parsed_data, connection, contract_name=query.contractName, contract_address=query.contractAddress, transaction_subType='expiryTime')
if not success_returnval:
return 0
@ -2363,6 +2334,36 @@ def reconnectWebsocket(socket_variable):
i=i+1
def get_websocket_uri(testnet=False):
if testnet:
return "wss://blockbook-testnet.ranchimall.net/websocket"
else:
return "wss://blockbook.ranchimall.net/websocket"
async def connect_to_websocket(uri):
while True:
try:
async with websockets.connect(uri) as websocket:
subscription_request = {
"id": "0",
"method": "subscribeNewBlock",
"params": {}
}
await websocket.send(json.dumps(subscription_request))
while True:
response = await websocket.recv()
logger.info(f"Received: {response}")
response = json.loads(response)
if 'height' in response['data'].keys():
processBlock(blockindex=response['data']['height'], blockhash=response['data']['hash'])
except Exception as e:
logger.info(f"Connection error: {e}")
# Add a delay before attempting to reconnect
await asyncio.sleep(5) # You can adjust the delay as needed
scanBlockchain()
# MAIN EXECUTION STARTS
# Configuration of required variables
config = configparser.ConfigParser()
@ -2394,6 +2395,7 @@ logger.addHandler(stream_handler)
parser = argparse.ArgumentParser(description='Script tracks RMT using FLO data on the FLO blockchain - https://flo.cash')
parser.add_argument('-r', '--reset', nargs='?', const=1, type=int, help='Purge existing db and rebuild it from scratch')
parser.add_argument('-rb', '--rebuild', nargs='?', const=1, type=int, help='Rebuild it')
parser.add_argument("--testnet", action="store_true", help="Use the testnet URL")
args = parser.parse_args()
dirpath = os.path.join(config['DEFAULT']['DATA_PATH'], 'tokens')
@ -2416,9 +2418,11 @@ serverlist = None
if config['DEFAULT']['NET'] == 'mainnet':
serverlist = config['DEFAULT']['MAINNET_FLOSIGHT_SERVER_LIST']
APP_ADMIN = 'FNcvkz9PZNZM3HcxM1XTrVL4tgivmCkHp9'
websocket_uri = get_websocket_uri(testnet=False)
elif config['DEFAULT']['NET'] == 'testnet':
serverlist = config['DEFAULT']['TESTNET_FLOSIGHT_SERVER_LIST']
APP_ADMIN = 'oWooGLbBELNnwq8Z5YmjoVjw8GhBGH3qSP'
websocket_uri = get_websocket_uri(testnet=True)
serverlist = serverlist.split(',')
neturl = config['DEFAULT']['FLOSIGHT_NETURL']
api_url = neturl
@ -2474,41 +2478,10 @@ if __name__ == "__main__":
# scan from the latest block saved locally to latest network block
scanBlockchain()
logger.debug("Completed first scan")
# At this point the script has updated to the latest block
# Now we connect to flosight's websocket API to get information about the latest blocks
# Neturl is the URL for Flosight API whose websocket endpoint is being connected to
sio = socketio.Client()
# Connect to a websocket endpoint and wait for further events
reconnectWebsocket(sio)
#sio.connect(f"{neturl}socket.io/socket.io.js")
@sio.on('connect')
def token_connect():
current_time=datetime.now().strftime('%H:%M:%S')
logger.info(f"Token Tracker has connected to websocket endpoint. Time : {current_time}")
sio.emit('subscribe', 'inv')
@sio.on('disconnect')
def token_disconnect():
current_time = datetime.now().strftime('%H:%M:%S')
logger.info(f"disconnect block: Token Tracker disconnected from websocket endpoint. Time : {current_time}")
logger.info('disconnect block: Triggering client disconnect')
sio.disconnect()
logger.info('disconnect block: Finished triggering client disconnect')
reconnectWebsocket(sio)
@sio.on('connect_error')
def connect_error():
current_time = datetime.now().strftime('%H:%M:%S')
logger.info(f"connection error block: Token Tracker disconnected from websocket endpoint. Time : {current_time}")
logger.info('connection error block: Triggering client disconnect')
sio.disconnect()
logger.info('connection error block: Finished triggering client disconnect')
reconnectWebsocket(sio)
@sio.on('block')
def on_block(data):
logger.info('New block received')
logger.info(str(data))
processBlock(blockhash=data)
asyncio.get_event_loop().run_until_complete(connect_to_websocket(websocket_uri))