Multi-threaded fetches for scanning blocks

- speedup scanBlocks using batch fetch threading from API
- fixes for check_for_reorg and process restart when reorg happens
This commit is contained in:
Sai Raj 2024-08-09 01:02:18 -04:00
parent 1b9f75ebb4
commit 63e5a0da87

View File

@ -5,6 +5,7 @@ import logging
import os
import shutil
import sys
import threading
#import pyflo
import requests
from sqlalchemy import create_engine, func, and_
@ -28,6 +29,8 @@ from src.backend.util_rollback import rollback_to_block
RETRY_TIMEOUT_LONG = 30 * 60 # 30 mins
RETRY_TIMEOUT_SHORT = 60 # 1 min
DB_RETRY_TIMEOUT = 60 # 60 seconds
BLOCK_SYNC_BATCHSIZE = 1000
BACK_TRACK_BLOCKS = 1000
def newMultiRequest(apicall):
current_server = serverlist[0]
@ -496,22 +499,8 @@ def fetchDynamicSwapPrice(contractStructure, blockinfo):
return float(contractStructure['price'])
def processBlock(blockindex=None, blockhash=None):
if blockindex is not None and blockhash is None:
logger.info(f'Processing block {blockindex}')
# Get block details
while blockhash is None or blockhash == '':
response = newMultiRequest(f"block-index/{blockindex}")
try:
blockhash = response['blockHash']
except:
logger.info(f"API call block-index/{blockindex} failed to give proper response. Retrying.")
blockinfo = newMultiRequest(f"block/{blockhash}")
#TODO: Check for reorg in here
def processBlockData(blockinfo):
# Check and perform operations which do not require blockchain intervention
checkLocal_expiry_trigger_deposit(blockinfo)
@ -564,6 +553,53 @@ def processBlock(blockindex=None, blockhash=None):
logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds")
time.sleep(DB_RETRY_TIMEOUT)
def fetchBlockData(blockindex, blockhash = None):
logger.info(f'Processing block {blockindex}')
# Get block details
while blockhash is None or blockhash == '':
response = newMultiRequest(f"block-index/{blockindex}")
try:
blockhash = response['blockHash']
except:
logger.info(f"API call block-index/{blockindex} failed to give proper response. Retrying.")
blockinfo = newMultiRequest(f"block/{blockhash}")
return blockinfo
def processBlock(blockindex=None, blockhash=None):
blockinfo = fetchBlockData(blockindex, blockhash)
processBlockData(blockinfo)
def processBlocksInBatch(startIndex, stopIndex, batchsize = BLOCK_SYNC_BATCHSIZE):
i_index = startIndex
blockinfos = [None] * batchsize
threads = [None] * batchsize
def fetchDataAndStore(blockindex, i):
blockinfos[i] = fetchBlockData(blockindex)
while i_index <= stopIndex:
# clear blockinfo array
for j in range(batchsize):
blockinfos[j] = None
threads[j] = None
# fetch data for blocks
for j in range(batchsize):
if (i_index <= stopIndex) and (i_index not in IGNORE_BLOCK_LIST):
threads[j] = threading.Thread(target=fetchDataAndStore, args=(i_index, j))
i_index += 1 # increment blockindex
# wait for all threads in the batch to complete
for j in range(batchsize):
if threads[j] is not None: # if i_index > stopIndex or in ignore list, then threads[j] will be None
threads[j].join()
# process the blockdata in linear (order of blockindex)
for j in range(batchsize):
if threads[j] is not None: # if i_index > stopIndex or in ignore list, then threads[j] will be None
processBlockData(blockinfos[j])
def updateLatestTransaction(transactionData, parsed_data, db_reference, transactionType=None ):
# connect to latest transaction db
@ -1050,10 +1086,9 @@ def checkLocal_expiry_trigger_deposit(blockinfo):
updateLatestTransaction(transaction_data, parsed_data, f"{query.contractName}-{query.contractAddress}")
def check_reorg():
def check_for_reorg(backtrack_count = BACK_TRACK_BLOCKS):
connection = create_database_connection('system_dbs')
blockbook_api_url = 'https://blockbook.ranchimall.net/'
BACK_TRACK_BLOCKS = 1000
# find latest block number in local database
latest_block = list(connection.execute("SELECT max(blockNumber) from latestBlocks").fetchone())[0]
@ -1064,22 +1099,18 @@ def check_reorg():
block_hash = list(connection.execute(f"SELECT blockHash from latestBlocks WHERE blockNumber = {block_number}").fetchone())[0]
# Check if the block is in blockbook (i.e, not dropped in reorg)
response = requests.get(f'{blockbook_api_url}api/block/{block_number}', verify=API_VERIFY)
if response.status_code == 200:
response = response.json()
if response['hash'] == block_hash: # local blockhash matches with blockbook hash
break
else: # check for older blocks to trace where reorg has happened
block_number -= BACK_TRACK_BLOCKS
continue
else:
logger.info('Response from the Blockbook API failed')
sys.exit(0) #TODO test reorg fix and remove this
response = newMultiRequest(f"block/{block_number}")
if response['hash'] == block_hash: # local blockhash matches with blockbook hash
break
else: # check for older blocks to trace where reorg has happened
block_number -= backtrack_count
continue
connection.close()
# rollback if needed
if block_number != latest_block:
stop_sync_loop() # stop the syncing process
rollback_to_block(block_number)
return block_number
@ -2444,18 +2475,31 @@ def processTransaction(transaction_data, parsed_data, blockinfo):
return 0
def scanBlockchain():
_is_scan_active = False
def scanBlockchain(startup = False):
global _is_scan_active
if _is_scan_active: # if there's already an instance of scan running, do nothing for this instance
return
_is_scan_active = True # set scanning as True, to prevent multiple instances of scanBlockchain from running
# Read start block no
while True:
try:
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
startblock = int(session.query(SystemData).filter_by(attribute='lastblockscanned').all()[0].value) + 1
session.commit()
session.close()
break
except:
logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds")
time.sleep(DB_RETRY_TIMEOUT)
if startup:
while True:
try:
session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase)
startblock = int(session.query(SystemData).filter_by(attribute='lastblockscanned').all()[0].value) + 1
session.commit()
session.close()
break
except:
logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds")
time.sleep(DB_RETRY_TIMEOUT)
else:
startblock = check_for_reorg() + 1 # returns the current last block scanned (db is rollbacked if reorg happens)
# todo Rule 6 - Find current block height
# Rule 7 - Start analysing the block contents from starting block to current height
@ -2475,13 +2519,17 @@ def scanBlockchain():
logger.info("Current block height is %s" % str(current_index))
break
for blockindex in range(startblock, current_index):
if blockindex in IGNORE_BLOCK_LIST:
continue
processBlock(blockindex=blockindex)
processBlocksInBatch(startblock, current_index)
#for blockindex in range(startblock, current_index):
# if blockindex in IGNORE_BLOCK_LIST:
# continue
# processBlock(blockindex=blockindex)
# At this point the script has updated to the latest block
# Now we connect to Blockbook's websocket API to get information about the latest blocks
if not startup and not isactive_sync_loop():
start_sync_loop()
def switchNeturl(currentneturl):
# Use modulo operation to simplify the logic
@ -2517,7 +2565,14 @@ def get_websocket_uri(testnet=False):
return "wss://blockbook.ranchimall.net/websocket"
async def connect_to_websocket(uri):
# global flag to pass termination when needed
global _isactive_sync
_isactive_sync = True
while True:
if not _isactive_sync:
return
try:
async with websockets.connect(uri) as websocket:
subscription_request = {
@ -2527,6 +2582,9 @@ async def connect_to_websocket(uri):
}
await websocket.send(json.dumps(subscription_request))
while True:
if not _isactive_sync:
websocket.close()
return scanBlockchain()
response = await websocket.recv()
logger.info(f"Received: {response}")
response = json.loads(response)
@ -2537,13 +2595,20 @@ async def connect_to_websocket(uri):
if response['data']['hash'] is None or response['data']['hash']=='':
print('blockhash is none')
# todo: remove these debugger lines
# If this is the issue need to proceed forward only once blockbook has consolitated
# If this is the issue need to proceed forward only once blockbook has consolitated
check_for_reorg()
if not _isactive_sync: #if reorg happens, _isactive_sync becomes False as sync is closed
websocket.close()
return scanBlockchain()
processBlock(blockindex=response['data']['height'], blockhash=response['data']['hash'])
except Exception as e:
logger.info(f"Connection error: {e}")
# Add a delay before attempting to reconnect
await asyncio.sleep(5) # You can adjust the delay as needed
if not _isactive_sync:
return
scanBlockchain()
def create_dir_if_not_exist(dir_path, reset = False):
@ -2661,6 +2726,24 @@ def initiate_process():
#IGNORE_BLOCK_LIST = [int(s) for s in IGNORE_BLOCK_LIST]
#IGNORE_TRANSACTION_LIST = _config['IGNORE_TRANSACTION_LIST']
def start_sync_loop():
global _sync_loop
_sync_loop = asyncio.get_event_loop()
_sync_loop.run_until_complete(connect_to_websocket(websocket_uri))
def isactive_sync_loop():
if _sync_loop is None:
return True
else:
return False
def stop_sync_loop():
global _sync_loop, _isactive_sync
_isactive_sync = False
if (_sync_loop is not None) and _sync_loop.is_running():
_sync_loop.stop()
_sync_loop = None
def start_backend_process(config, reset = False):
global _config
_config = config
@ -2668,13 +2751,12 @@ def start_backend_process(config, reset = False):
init_storage_if_not_exist(reset)
# MAIN LOGIC STARTS
# scan from the latest block saved locally to latest network block
scanBlockchain()
scanBlockchain(startup=True)
logger.debug("Completed first scan")
# At this point the script has updated to the latest block
# Now we connect to Blockbook's websocket API to get information about the latest blocks
# Neturl is the URL for Blockbook API whose websocket endpoint is being connected to
asyncio.get_event_loop().run_until_complete(connect_to_websocket(websocket_uri))
start_sync_loop()
"""
# Determine API source for block and transaction information