From f251be29f0aad93c78725bd773b7cea71022f711 Mon Sep 17 00:00:00 2001 From: Abhishek Sinha Date: Thu, 27 Jun 2019 16:39:32 +0530 Subject: [PATCH] added code to sync primary or backup data based on hash, timestamp and vector clock filter --- supernode/index.html | 246 ++++++++++++++++++++++++++----------------- 1 file changed, 152 insertions(+), 94 deletions(-) diff --git a/supernode/index.html b/supernode/index.html index bc85fe0..92c0f49 100644 --- a/supernode/index.html +++ b/supernode/index.html @@ -9635,8 +9635,8 @@ btc_mainnet: "https://blockexplorer.com", btc_testnet: "https://testnet.blockexplorer.com", flo_mainnet: "http://flosight.duckdns.org", - flo_testnet: "http://testnet-flosight.duckdns.org" - //flo_testnet: "https://testnet.flocha.in" + //flo_testnet: "http://testnet-flosight.duckdns.org" + flo_testnet: "https://testnet.flocha.in" }, writable: false, configurable: false, @@ -10161,35 +10161,77 @@ sync_primary_supernode_from_backup_supernode: function (primary_su="", backup_su="") { const RM_RPC = new localbitcoinplusplus.rpc; - RM_RPC.send_rpc.call(this, - "sync_primary_supernode_from_backup_supernode", { - "trader_flo_address": primary_su, - "job": "SYNC_PRIMARY_SUPERNODE_DB_WITH_BACKUP_SUPERNODE_DB", - "receiver_flo_address": backup_su, - }).then(sync_request=>{ - if (typeof localbitcoinplusplus.backupWS[backup_su]=="object") { - doSend(sync_request, backup_su); - } else { - doSend(sync_request); - } - }); + // RM_RPC.send_rpc.call(this, + // "sync_primary_supernode_from_backup_supernode", { + // "trader_flo_address": primary_su, + // "job": "SYNC_PRIMARY_SUPERNODE_DB_WITH_BACKUP_SUPERNODE_DB", + // "receiver_flo_address": backup_su, + // }).then(sync_request=>{ + // if (typeof localbitcoinplusplus.backupWS[backup_su]=="object") { + // doSend(sync_request, backup_su); + // } else { + // doSend(sync_request); + // } + // }); + + // First check if you yourself have the right data to serve + // If not, either get the data or don't serve the users of + // that dead supernode. + + const tableArray = ["deposit", "withdraw_cash", "withdraw_btc", "cash_balances", + "crypto_balances", "buyOrders", "sellOrders", "system_btc_reserves_private_keys"]; + + const su_db_data = await localbitcoinplusplus.actions.get_sharable_db_data(tableArray, primary_su); + + const dbHashData = await localbitcoinplusplus.actions.getDBTablesLatestHashAndTimestamp(primary_su, su_db_data); + + // Now you have db tables timestamp and tables hashes. Send it to other supernodes to check + // if you have the latest data. If you don't have the latest data, someone + // will send you the latest data which you can verify before updating. + + RM_RPC + .send_rpc + .call(this, "do_you_have_latest_data_for_this_supernode", dbHashData) + .then(server_sync_response=>doSend(server_sync_response)); + }, sync_backup_supernode_from_backup_supernode: function (requester="", receiver="", flo_addr_of_backup="") { const RM_RPC = new localbitcoinplusplus.rpc; - RM_RPC.send_rpc.call(this, - "sync_backup_supernode_from_backup_supernode", { - "trader_flo_address": flo_addr_of_backup, - "job": "SYNC_BACKUP_SUPERNODE_DB_WITH_BACKUP_SUPERNODE_DB", - "receiver_flo_address": receiver, - "requester_flo_id": requester - }).then(sync_request=>{ - if (typeof localbitcoinplusplus.backupWS[receiver]=="object") { - doSend(sync_request, receiver); - } else { - doSend(sync_request); - } - }); + // RM_RPC.send_rpc.call(this, + // "sync_backup_supernode_from_backup_supernode", { + // "trader_flo_address": flo_addr_of_backup, + // "job": "SYNC_BACKUP_SUPERNODE_DB_WITH_BACKUP_SUPERNODE_DB", + // "receiver_flo_address": receiver, + // "requester_flo_id": requester + // }).then(sync_request=>{ + // if (typeof localbitcoinplusplus.backupWS[receiver]=="object") { + // doSend(sync_request, receiver); + // } else { + // doSend(sync_request); + // } + // }); + + // First check if you yourself have the right data to serve + // If not, either get the data or don't serve the users of + // that dead supernode. + + const tableArray = ["deposit", "withdraw_cash", "withdraw_btc", "cash_balances", + "crypto_balances", "buyOrders", "sellOrders", "system_btc_reserves_private_keys"]; + + const su_db_data = await localbitcoinplusplus.actions.get_sharable_db_data(tableArray, flo_addr_of_backup); + + const dbHashData = await localbitcoinplusplus.actions.getDBTablesLatestHashAndTimestamp(flo_addr_of_backup, su_db_data); + + // Now you have db tables timestamp and tables hashes. Send it to other supernodes to check + // if you have the latest data. If you don't have the latest data, someone + // will send you the latest data which you can verify before updating. + + RM_RPC + .send_rpc + .call(this, "do_you_have_latest_data_for_this_supernode", dbHashData) + .then(server_sync_response=>doSend(server_sync_response)); + }, get_sharable_db_data: async function (dbTableNamesArray, backup_db="") { @@ -10280,7 +10322,7 @@ const subjectDB = (supernode_flo_id==localbitcoinplusplus.wallets.my_local_flo_address) ? "":supernode_flo_id; - let dbDataOfSupernode; + let dbDataOfSupernode=actual_db_data; if (actual_db_data==false) { if (typeof tableArray!=="object") { @@ -10304,21 +10346,13 @@ const dbDataOfSupernodeStr = JSON.stringify(dbDataOfSupernode); const dbDataOfSupernodeHash = Crypto.SHA256(dbDataOfSupernodeStr); - myArray["id"] = `SU_DB_${su}`; + myArray["id"] = `SU_DB_${supernode_flo_id}`; myArray["DBHash"] = dbDataOfSupernodeHash; - myArray["trader_flo_address"] = localbitcoinplusplus.wallets.my_local_flo_address; - myArray["data_of"] = su; + myArray["trader_flo_address"] = supernode_flo_id; + myArray["data_of"] = supernode_flo_id; myArray["higestTimestampList"] = {...higestTimestampList}; return myArray; - - // const RM_RPC = new localbitcoinplusplus.rpc; - - // RM_RPC - // .send_rpc - // .call(this, "do_you_have_latest_data_for_this_supernode", myArray) - // .then(supernodesDbHash_response=> - // doSend(supernodesDbHash_response)); } return false; @@ -16689,8 +16723,7 @@ } if(data.vectorClock > myOwnDBData.vectorClock) { // You have old data, update respective DB. - data.increaseVectorClock = false; - updateinDB(dbTable, data).then(()=>{ + updateinDB(dbTable, data, data.id, false, false).then(()=>{ showMessage(`INFO: Data updated in ${dbTable} for id ${data.id}.`); }); } @@ -16730,7 +16763,7 @@ for (var prop in obj) { if (!obj.hasOwnProperty(prop)) continue; //if(obj[prop].) - await updateinDB(tableStoreName, obj[prop], obj[prop].id, true) + await updateinDB(tableStoreName, obj[prop], obj[prop].id, true, false) .then(()=>{ showMessage(`INFO: "${tableStoreName}" datastore syncing is complete.`); }); @@ -16819,7 +16852,7 @@ for (var prop in obj) { if (!obj.hasOwnProperty(prop)) continue; await localbitcoinplusplus.newBackupDatabase.db[su_db_data.trader_flo_address] - .backup_updateinDB(tableStoreName, obj[prop], obj[prop].id, true) + .backup_updateinDB(tableStoreName, obj[prop], obj[prop].id, true, false) .then(()=>{ showMessage(`INFO: "${tableStoreName}" datastore syncing is complete.`); }); @@ -19144,7 +19177,7 @@ res_obj.nodePubKey ); if (isBalanceLegit) { - backup_server_db_instance.backup_updateinDB("deposits", + backup_server_db_instance.backup_updateinDB("deposit", updateUserDepositsResponseObject.updatedTraderDepositObject); if (localbitcoinplusplus.wallets.my_local_flo_address == updateUserDepositsResponseObject.trader_flo_address) { @@ -19235,7 +19268,7 @@ if (!obj.hasOwnProperty(prop)) continue; await localbitcoinplusplus.newBackupDatabase.db[su_db_data.trader_flo_address] .backup_updateinDB(tableStoreName, obj[prop], obj[prop] - .id, true).then(()=>{ + .id, true, false).then(()=>{ showMessage(`INFO: "${tableStoreName}" datastore syncing is complete.`); }); } @@ -19273,14 +19306,21 @@ localbitcoinplusplus.kademlia.determineClosestSupernode(res_obj.params[0].trader_flo_address) .then(async my_closest_su_list=>{ const primarySupernodeOfThisUser = my_closest_su_list[0].data.id; - const backup_server_db_instance = localbitcoinplusplus.newBackupDatabase.db[primarySupernodeOfThisUser]; - if(typeof backup_server_db_instance !== "object" - || backup_server_db_instance==localbitcoinplusplus.wallets.my_local_flo_address) { - let backup_db_error_msg = `WARNING: Unknown DB instance. DB Backup failed.`; - showMessage(backup_db_error_msg); - throw new Error(backup_db_error_msg); - }; + let _readAllDB = readAllDB; + if (typeof primarySupernodeOfThisUser=="string" + && primarySupernodeOfThisUser.length>0 + && primarySupernodeOfThisUser !== localbitcoinplusplus.wallets.my_local_flo_address + ) { + if (typeof localbitcoinplusplus.newBackupDatabase.db[primarySupernodeOfThisUser] == "object") { + const foreign_db = localbitcoinplusplus.newBackupDatabase.db[primarySupernodeOfThisUser]; + _readAllDB = foreign_db.backup_readAllDB.bind(foreign_db); + } else { + err_msg = `WARNING: Invalid Backup DB Instance Id: ${primarySupernodeOfThisUser}.`; + showMessage(err_msg); + throw new Error(err_msg); + } + } console.log(response_object); @@ -19293,18 +19333,18 @@ // If you have same data as the sender has, you don't need to return any data to him if (dbHashData_from_my_db.DBHash===response_object.DBHash) return; - if (dbHashData_from_my_db.id===response_object.id) return; - if (dbHashData_from_my_db.data_of===response_object.data_of) return; - if (dbHashData_from_my_db.trader_flo_address===response_object.trader_flo_address) return; + if (dbHashData_from_my_db.id!=response_object.id) return; + if (dbHashData_from_my_db.data_of!=response_object.data_of) return; let mismatched_fields = []; for (var q in dbHashData_from_my_db) { if (dbHashData_from_my_db.hasOwnProperty(q) - || q=='higestTimestampList' - || q=='id' - || q=='data_of' - || q=='trader_flo_address') { + && q!='higestTimestampList' + && q!='id' + && q!='data_of' + && q!="DBHash" + && q!='trader_flo_address') { if (dbHashData_from_my_db[q]!==response_object[q]) { mismatched_fields.push(q); } @@ -19315,24 +19355,37 @@ let latest_data = {}; - mismatched_fields.map(mf=>{ - backup_server_db_instance.backup_readAllDB(mf) - .then(res_data_obj=>{ - - let filtered_data = res_data_obj.filter(odho=>{ - if (typeof odho.vectorClock=="number" - && typeof response_object.higestTimestampList[`${mf}_HVC`] !=='undefined') { - return odho.vectorClock >= response_object.higestTimestampList[`${mf}_HVC`]; - } - }); - - latest_data[mf] = {...filtered_data}; + mismatched_fields.map(async mf=>{ + const res_data_obj = await _readAllDB(mf) + let filtered_data = res_data_obj.filter(odho=>{ + if (typeof odho.timestamp=="number" + && typeof response_object.higestTimestampList[`${mf}_HVC`] !=='undefined') { + return odho.timestamp >= response_object.higestTimestampList[`${mf}_HVC`]; + } }); + + latest_data[mf] = filtered_data; }); - // Send the data back to sender console.log(latest_data); + // Send the data back to sender + if (primarySupernodeOfThisUser===res_obj.globalParams.senderFloId) { + latest_data.trader_flo_address = primarySupernodeOfThisUser; + latest_data.receiver_flo_address = res_obj.globalParams.senderFloId; + RM_RPC + .send_rpc + .call(this, "sync_primary_supernode_from_backup_supernode_response", latest_data) + .then(server_sync_response=>doSend(server_sync_response)); + } else { + latest_data.trader_flo_address = primarySupernodeOfThisUser; + latest_data.receiver_flo_address = res_obj.globalParams.senderFloId; + RM_RPC + .send_rpc + .call(this, "sync_backup_supernode_from_backup_supernode_response", latest_data) + .then(server_sync_response=>doSend(server_sync_response)); + } + }); } @@ -19937,6 +19990,7 @@ async function addDB(tablename, dbObject) { try { if(typeof dbObject.vectorClock == "undefined") dbObject.vectorClock = 0; + dbObject.timestamp = + new Date(); let request = db.transaction([tablename], "readwrite") let store = request.objectStore(tablename) await store.add(dbObject); @@ -19948,17 +20002,18 @@ } } - async function updateinDB(tablename, Obj, key, updateByVectorClock=false) { + async function updateinDB(tablename, Obj, key, updateByVectorClock=false, increaseVectorClock=true) { // updateByVectorClock==true will not return Obj back. // Return value will be undefined try { if(typeof Obj.vectorClock == "undefined") { Obj.vectorClock = 0; - } else if(typeof Obj.increaseVectorClock == "boolean" - && Obj.increaseVectorClock === false) { + Obj.timestamp = + new Date(); + } else if(increaseVectorClock === false) { // leave the vector clock field unchanged } else { Obj.vectorClock += 1; + Obj.timestamp = + new Date(); } var request = db.transaction([tablename], "readwrite") let store = request.objectStore(tablename) @@ -20310,6 +20365,8 @@ async backup_addDB(tablename, dbObject) { try { + if(typeof dbObject.vectorClock == "undefined") dbObject.vectorClock = 0; + dbObject.timestamp = + new Date(); this.request = this.db.transaction([tablename], "readwrite") let store = this.request.objectStore(tablename) await store.add(dbObject); @@ -20321,15 +20378,16 @@ } }, - async backup_updateinDB(tablename, Obj, key, updateByVectorClock=false) { + async backup_updateinDB(tablename, Obj, key, updateByVectorClock=false, increaseVectorClock=true) { try { if(typeof Obj.vectorClock == "undefined") { Obj.vectorClock = 0; - } else if(typeof Obj.increaseVectorClock == "boolean" - && Obj.increaseVectorClock === false) { + Obj.timestamp = + new Date(); + } else if(increaseVectorClock === false) { // leave the vector clock field unchanged } else { Obj.vectorClock += 1; + Obj.timestamp = + new Date(); } let that = this; this.request = this.db.transaction([tablename], "readwrite") @@ -21244,12 +21302,11 @@ try { let url = `${explorer}/api/addr/${trader_deposits.btc_address}/balance`; console.log(url); - helper_functions.ajaxGet(url).then(balance => { + helper_functions.ajaxGet(url).then(async balance => { if (!isNaN(balance) && parseFloat(balance) > 0) { balance = Number(parseFloat(balance/decimal)); console.log(balance); - /************************ Case of dispute *****************/ if (trader_deposits.bitcoinToBePaid - balance > localbitcoinplusplus.master_configurations.btcTradeMargin) { console.log(trader_deposits.bitcoinToBePaid, balance, localbitcoinplusplus.master_configurations @@ -21261,19 +21318,20 @@ } else { //Deposit successful. Update user balance and status to 2. Its Private key can be // now given to a random trader + + const reserve_res = await _readDBbyIndex('system_btc_reserves_private_keys', 'btc_address', trader_deposits.btc_address) + if (typeof reserve_res == "object") { + reserve_res.map(reserves => { + reserves.balance = balance; + _updateinDB('system_btc_reserves_private_keys', reserves, reserves.id); + }); + } else { + throw new Error(`ERROR: No such BTC reserves found.`); + } + trader_deposits.status = 2; _updateinDB("deposit", trader_deposits, trader_deposits.trader_flo_address); - _readDBbyIndex('system_btc_reserves_private_keys', 'btc_address', trader_deposits.btc_address) - .then(function (reserve_res) { - if (typeof reserve_res == "object") { - reserve_res.map(reserves => { - reserves.balance = balance; - _updateinDB('system_btc_reserves_private_keys', reserves, reserves.id); - }); - } - }); - let trader_depositor_cash_id = `${trader_deposits.trader_flo_address}_${trader_deposits.product}`; let updatedCryptobalances = { @@ -21338,7 +21396,7 @@ .then(updateUsertraderDepositsRequestObject=> doSend(updateUsertraderDepositsRequestObject)); - const reservesObjectString = JSON.stringify(trader_deposits); + const reservesObjectString = JSON.stringify(reserve_res); const reservesObjectStringHash = Crypto.SHA256(reservesObjectString); const reservesObjectSign = RM_WALLET @@ -21347,10 +21405,10 @@ ); const updateUserBTCReservesRequestObject = { - updatedReservesObject: trader_deposits, + updatedReservesObject: reserve_res, updatedBTCReservesObjectSign: reservesObjectSign, - trader_flo_address: trader_deposits.trader_flo_address, - receiver_flo_address: trader_deposits.trader_flo_address + trader_flo_address: reserve_res.trader_flo_address, + receiver_flo_address: reserve_res.trader_flo_address } RM_RPC