diff --git a/src/backend/backend_main.py b/src/backend/backend_main.py index f80abdf..ec16d79 100644 --- a/src/backend/backend_main.py +++ b/src/backend/backend_main.py @@ -5,6 +5,7 @@ import logging import os import shutil import sys +import threading #import pyflo import requests from sqlalchemy import create_engine, func, and_ @@ -28,6 +29,8 @@ from src.backend.util_rollback import rollback_to_block RETRY_TIMEOUT_LONG = 30 * 60 # 30 mins RETRY_TIMEOUT_SHORT = 60 # 1 min DB_RETRY_TIMEOUT = 60 # 60 seconds +BLOCK_SYNC_BATCHSIZE = 1000 +BACK_TRACK_BLOCKS = 1000 def newMultiRequest(apicall): current_server = serverlist[0] @@ -496,22 +499,8 @@ def fetchDynamicSwapPrice(contractStructure, blockinfo): return float(contractStructure['price']) - -def processBlock(blockindex=None, blockhash=None): - if blockindex is not None and blockhash is None: - logger.info(f'Processing block {blockindex}') - # Get block details - while blockhash is None or blockhash == '': - response = newMultiRequest(f"block-index/{blockindex}") - try: - blockhash = response['blockHash'] - except: - logger.info(f"API call block-index/{blockindex} failed to give proper response. Retrying.") - - blockinfo = newMultiRequest(f"block/{blockhash}") - - #TODO: Check for reorg in here - +def processBlockData(blockinfo): + # Check and perform operations which do not require blockchain intervention checkLocal_expiry_trigger_deposit(blockinfo) @@ -564,6 +553,53 @@ def processBlock(blockindex=None, blockhash=None): logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds") time.sleep(DB_RETRY_TIMEOUT) +def fetchBlockData(blockindex, blockhash = None): + logger.info(f'Processing block {blockindex}') + # Get block details + while blockhash is None or blockhash == '': + response = newMultiRequest(f"block-index/{blockindex}") + try: + blockhash = response['blockHash'] + except: + logger.info(f"API call block-index/{blockindex} failed to give proper response. Retrying.") + blockinfo = newMultiRequest(f"block/{blockhash}") + return blockinfo + +def processBlock(blockindex=None, blockhash=None): + blockinfo = fetchBlockData(blockindex, blockhash) + processBlockData(blockinfo) + +def processBlocksInBatch(startIndex, stopIndex, batchsize = BLOCK_SYNC_BATCHSIZE): + i_index = startIndex + blockinfos = [None] * batchsize + threads = [None] * batchsize + + def fetchDataAndStore(blockindex, i): + blockinfos[i] = fetchBlockData(blockindex) + + while i_index <= stopIndex: + + # clear blockinfo array + for j in range(batchsize): + blockinfos[j] = None + threads[j] = None + + # fetch data for blocks + for j in range(batchsize): + if (i_index <= stopIndex) and (i_index not in IGNORE_BLOCK_LIST): + threads[j] = threading.Thread(target=fetchDataAndStore, args=(i_index, j)) + i_index += 1 # increment blockindex + + # wait for all threads in the batch to complete + for j in range(batchsize): + if threads[j] is not None: # if i_index > stopIndex or in ignore list, then threads[j] will be None + threads[j].join() + + # process the blockdata in linear (order of blockindex) + for j in range(batchsize): + if threads[j] is not None: # if i_index > stopIndex or in ignore list, then threads[j] will be None + processBlockData(blockinfos[j]) + def updateLatestTransaction(transactionData, parsed_data, db_reference, transactionType=None ): # connect to latest transaction db @@ -1050,10 +1086,9 @@ def checkLocal_expiry_trigger_deposit(blockinfo): updateLatestTransaction(transaction_data, parsed_data, f"{query.contractName}-{query.contractAddress}") -def check_reorg(): +def check_for_reorg(backtrack_count = BACK_TRACK_BLOCKS): + connection = create_database_connection('system_dbs') - blockbook_api_url = 'https://blockbook.ranchimall.net/' - BACK_TRACK_BLOCKS = 1000 # find latest block number in local database latest_block = list(connection.execute("SELECT max(blockNumber) from latestBlocks").fetchone())[0] @@ -1064,22 +1099,18 @@ def check_reorg(): block_hash = list(connection.execute(f"SELECT blockHash from latestBlocks WHERE blockNumber = {block_number}").fetchone())[0] # Check if the block is in blockbook (i.e, not dropped in reorg) - response = requests.get(f'{blockbook_api_url}api/block/{block_number}', verify=API_VERIFY) - if response.status_code == 200: - response = response.json() - if response['hash'] == block_hash: # local blockhash matches with blockbook hash - break - else: # check for older blocks to trace where reorg has happened - block_number -= BACK_TRACK_BLOCKS - continue - else: - logger.info('Response from the Blockbook API failed') - sys.exit(0) #TODO test reorg fix and remove this + response = newMultiRequest(f"block/{block_number}") + if response['hash'] == block_hash: # local blockhash matches with blockbook hash + break + else: # check for older blocks to trace where reorg has happened + block_number -= backtrack_count + continue connection.close() # rollback if needed if block_number != latest_block: + stop_sync_loop() # stop the syncing process rollback_to_block(block_number) return block_number @@ -2444,18 +2475,31 @@ def processTransaction(transaction_data, parsed_data, blockinfo): return 0 -def scanBlockchain(): +_is_scan_active = False + +def scanBlockchain(startup = False): + + global _is_scan_active + + if _is_scan_active: # if there's already an instance of scan running, do nothing for this instance + return + + _is_scan_active = True # set scanning as True, to prevent multiple instances of scanBlockchain from running + # Read start block no - while True: - try: - session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) - startblock = int(session.query(SystemData).filter_by(attribute='lastblockscanned').all()[0].value) + 1 - session.commit() - session.close() - break - except: - logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds") - time.sleep(DB_RETRY_TIMEOUT) + if startup: + while True: + try: + session = create_database_session_orm('system_dbs', {'db_name': "system"}, SystemBase) + startblock = int(session.query(SystemData).filter_by(attribute='lastblockscanned').all()[0].value) + 1 + session.commit() + session.close() + break + except: + logger.info(f"Unable to connect to 'system' database... retrying in {DB_RETRY_TIMEOUT} seconds") + time.sleep(DB_RETRY_TIMEOUT) + else: + startblock = check_for_reorg() + 1 # returns the current last block scanned (db is rollbacked if reorg happens) # todo Rule 6 - Find current block height # Rule 7 - Start analysing the block contents from starting block to current height @@ -2475,13 +2519,17 @@ def scanBlockchain(): logger.info("Current block height is %s" % str(current_index)) break - for blockindex in range(startblock, current_index): - if blockindex in IGNORE_BLOCK_LIST: - continue - processBlock(blockindex=blockindex) + processBlocksInBatch(startblock, current_index) + + #for blockindex in range(startblock, current_index): + # if blockindex in IGNORE_BLOCK_LIST: + # continue + # processBlock(blockindex=blockindex) # At this point the script has updated to the latest block # Now we connect to Blockbook's websocket API to get information about the latest blocks + if not startup and not isactive_sync_loop(): + start_sync_loop() def switchNeturl(currentneturl): # Use modulo operation to simplify the logic @@ -2517,7 +2565,14 @@ def get_websocket_uri(testnet=False): return "wss://blockbook.ranchimall.net/websocket" async def connect_to_websocket(uri): + + # global flag to pass termination when needed + global _isactive_sync + _isactive_sync = True + while True: + if not _isactive_sync: + return try: async with websockets.connect(uri) as websocket: subscription_request = { @@ -2527,6 +2582,9 @@ async def connect_to_websocket(uri): } await websocket.send(json.dumps(subscription_request)) while True: + if not _isactive_sync: + websocket.close() + return scanBlockchain() response = await websocket.recv() logger.info(f"Received: {response}") response = json.loads(response) @@ -2537,13 +2595,20 @@ async def connect_to_websocket(uri): if response['data']['hash'] is None or response['data']['hash']=='': print('blockhash is none') # todo: remove these debugger lines - # If this is the issue need to proceed forward only once blockbook has consolitated + # If this is the issue need to proceed forward only once blockbook has consolitated + + check_for_reorg() + if not _isactive_sync: #if reorg happens, _isactive_sync becomes False as sync is closed + websocket.close() + return scanBlockchain() processBlock(blockindex=response['data']['height'], blockhash=response['data']['hash']) except Exception as e: logger.info(f"Connection error: {e}") # Add a delay before attempting to reconnect await asyncio.sleep(5) # You can adjust the delay as needed + if not _isactive_sync: + return scanBlockchain() def create_dir_if_not_exist(dir_path, reset = False): @@ -2661,6 +2726,24 @@ def initiate_process(): #IGNORE_BLOCK_LIST = [int(s) for s in IGNORE_BLOCK_LIST] #IGNORE_TRANSACTION_LIST = _config['IGNORE_TRANSACTION_LIST'] +def start_sync_loop(): + global _sync_loop + _sync_loop = asyncio.get_event_loop() + _sync_loop.run_until_complete(connect_to_websocket(websocket_uri)) + +def isactive_sync_loop(): + if _sync_loop is None: + return True + else: + return False + +def stop_sync_loop(): + global _sync_loop, _isactive_sync + _isactive_sync = False + if (_sync_loop is not None) and _sync_loop.is_running(): + _sync_loop.stop() + _sync_loop = None + def start_backend_process(config, reset = False): global _config _config = config @@ -2668,13 +2751,12 @@ def start_backend_process(config, reset = False): init_storage_if_not_exist(reset) # MAIN LOGIC STARTS # scan from the latest block saved locally to latest network block - scanBlockchain() + scanBlockchain(startup=True) logger.debug("Completed first scan") # At this point the script has updated to the latest block # Now we connect to Blockbook's websocket API to get information about the latest blocks # Neturl is the URL for Blockbook API whose websocket endpoint is being connected to - - asyncio.get_event_loop().run_until_complete(connect_to_websocket(websocket_uri)) + start_sync_loop() """ # Determine API source for block and transaction information