From 172fc1a60367e67a0d4a57ab76831e175cf6a3ec Mon Sep 17 00:00:00 2001 From: Abhishek Sinha Date: Sun, 21 Apr 2019 18:48:17 +0530 Subject: [PATCH] data being synced in all neighbour backup db as per vector clock --- supernode/index.html | 254 +++++++++++++++++++++++++++++-------------- 1 file changed, 174 insertions(+), 80 deletions(-) diff --git a/supernode/index.html b/supernode/index.html index 5e51c3a..77c0ac0 100644 --- a/supernode/index.html +++ b/supernode/index.html @@ -10138,6 +10138,13 @@ } return arr; }, + get_sharable_db_data_for_single_user: async function (dbTableNamesArray) { + let arr = {}; + for (const elem of dbTableNamesArray) { + await readDBbyIndex(elem).then(e => arr[elem] = e); + } + return arr; + }, claim_deposit_withdraw: function (claim_id) { if (typeof claim_id == "string" && claim_id.length > 0) { @@ -11073,11 +11080,7 @@ let store_pvtkey_req = RM_RPC .send_rpc .call(this, "store_shamirs_secret_pvtkey_shares", shares); - localbitcoinplusplus.kademlia.determineClosestSupernode(flo_id) - .then(my_closest_su=>{ - store_pvtkey_req.globalParams.primarySupernode = my_closest_su[0].data.id; - return store_pvtkey_req; - }).then((store_pvtkey_req)=>doSend(store_pvtkey_req)) + doSend(store_pvtkey_req); }); return Promise.resolve(true); @@ -11115,6 +11118,22 @@ request.globalParams.receiverFloId = params[0].receiver_flo_address; } + if (typeof params[0].trader_flo_address !=="string") { + readDB('localbitcoinUser', '00-01').then(result=>{ + if (typeof result !=="object" || typeof result.myLocalFLOAddress !=="string") { + let msg = `ERROR: Failed to determine user's local data.`; + showMessage(msg); + throw new Error(msg); + } + localbitcoinplusplus.kademlia + .determineClosestSupernode(result.myLocalFLOAddress) + .then(my_closest_su=>{ + request.globalParams.primarySupernode = my_closest_su[0].data.id; + return request.toString(); + }); + }); + } + return request.toString(); }, @@ -11511,12 +11530,7 @@ "store_shamirs_secret_pvtkey_shares", shares ); - - localbitcoinplusplus.kademlia.determineClosestSupernode(flo_id) - .then(my_closest_su=>{ - store_pvtkey_req.globalParams.primarySupernode = my_closest_su[0].data.id; - return store_pvtkey_req; - }).then(store_pvtkey_req=>doSend(store_pvtkey_req)) + doSend(store_pvtkey_req); }); if (typeof localbitcoinplusplus @@ -11968,12 +11982,7 @@ withdraw_id: vbl.withdraw_id } ); - localbitcoinplusplus.kademlia - .determineClosestSupernode(localbitcoinplusplus.wallets.my_local_flo_address) - .then(my_closest_su=>{ - retrieve_pvtkey_req.globalParams.primarySupernode = my_closest_su[0].data.id; - return retrieve_pvtkey_req; - }).then(retrieve_pvtkey_req=>doSend(retrieve_pvtkey_req)); + doSend(retrieve_pvtkey_req); } ); }); @@ -12779,11 +12788,7 @@ shares ); - localbitcoinplusplus.kademlia.determineClosestSupernode(flo_id) - .then(my_closest_su=>{ - store_pvtkey_req.globalParams.primarySupernode = my_closest_su[0].data.id; - return store_pvtkey_req; - }).then(store_pvtkey_req=>doSend(store_pvtkey_req)) + doSend(store_pvtkey_req); }); if (typeof localbitcoinplusplus @@ -13235,12 +13240,7 @@ withdraw_id: vbl.withdraw_id } ); - localbitcoinplusplus.kademlia - .determineClosestSupernode(localbitcoinplusplus.wallets.my_local_flo_address) - .then(my_closest_su=>{ - retrieve_pvtkey_req.globalParams.primarySupernode = my_closest_su[0].data.id; - return retrieve_pvtkey_req; - }).then(retrieve_pvtkey_req=>doSend(retrieve_pvtkey_req)); + doSend(retrieve_pvtkey_req); } ); }); @@ -14506,7 +14506,6 @@ "data_hash": hashed_data, "supernode_sign": signed_data, "supernodePubKey": user_data.myLocalFLOPublicKey, - "receiver_flo_address": "BROADCAST_TO_NODES", } callback(response_for_client); return true; @@ -15440,13 +15439,7 @@ .send_rpc .call(this, "SupernodesKBucketDataResponse", myKBData); - localbitcoinplusplus.kademlia.determineClosestSupernode(sender) - .then(my_closest_su=>{ - console.log(sendBackMySupernodeKBucket); - - sendBackMySupernodeKBucket.globalParams.primarySupernode = my_closest_su[0].data.id; - return sendBackMySupernodeKBucket; - }).then((sendBackMySupernodeKBucket)=>doSend(sendBackMySupernodeKBucket)); + doSend(sendBackMySupernodeKBucket); }) } ); @@ -15713,6 +15706,12 @@ try { var res_obj = JSON.parse(res); + if (typeof res_obj.globalParams.receiverFloId=="string" + && res_obj.globalParams.receiverFloId !== + localbitcoinplusplus.wallets.my_local_flo_address) { + return; + } + const isIncomingMessageValid = await validateIncomingMessage(res); console.log("isIncomingMessageValid: ", isIncomingMessageValid); @@ -16033,12 +16032,7 @@ .send_rpc .call(this, "retrieve_shamirs_secret_supernode_pvtkey", ""); } - - localbitcoinplusplus.kademlia.determineClosestSupernode(res_obj.globalParams.senderFloId) - .then(my_closest_su=>{ - send_pvtkey_req.globalParams.primarySupernode = my_closest_su[0].data.id; - return send_pvtkey_req; - }).then(send_pvtkey_req=>doSend(send_pvtkey_req)); + doSend(send_pvtkey_req); return; }); @@ -16088,11 +16082,7 @@ private_key_chunk: res, withdraw_id: res_obj.params[0].withdraw_id }); - localbitcoinplusplus.kademlia.determineClosestSupernode(res_obj.globalParams.senderFloId) - .then(my_closest_su=>{ - send_pvtkey_req.globalParams.primarySupernode = my_closest_su[0].data.id; - return send_pvtkey_req; - }).then(send_pvtkey_req=>doSend(send_pvtkey_req)) + doSend(send_pvtkey_req); }); } break; @@ -16662,17 +16652,6 @@ console.log(res_obj); break; - case "getNeighborSupernodesItsVectorClockStatusForADeadSupernodeDBReq": - if (localbitcoinplusplus.master_configurations.supernodesPubKeys - .inlcudes(localbitcoinplusplus.wallets.my_local_flo_public_key)) { - if (typeof res_obj.params == "object" && typeof res_obj.params[0] == "object") { - let req_dt = res_obj.params[0]; - console.log(req_dt); - - } - } - break; - default: break; } @@ -16711,6 +16690,12 @@ try { var res_obj = JSON.parse(res); + if (typeof res_obj.globalParams.receiverFloId=="string" + && res_obj.globalParams.receiverFloId !== + localbitcoinplusplus.wallets.my_local_flo_address) { + return; + } + const isIncomingMessageValid = await validateIncomingMessage(res); console.log("isIncomingMessageValid: ", isIncomingMessageValid); @@ -17821,6 +17806,87 @@ console.log(res_obj); break; + case "sync_data_by_vector_clock": + if (localbitcoinplusplus.master_configurations.supernodesPubKeys + .includes(localbitcoinplusplus.wallets.my_local_flo_public_key)) { + if (typeof res_obj.params == "object" && typeof res_obj.params[0] == "object") { + try { + + (async function() { + let req_dt = res_obj.params[0]; + let db_instance = localbitcoinplusplus.newBackupDatabase.db[req_dt.leaving_supernode_flo_id]; + let dbTable = req_dt.dbTable; + let data = req_dt.data; + let subjectUser = data.trader_flo_address; + let mss = ''; + + if (typeof data.id !== "string" && typeof data.id !== "number") { + mss = `WARNING: Failed to sync data by vector clock as id field could not be found.`; + showMessage(mss); + throw new Error(mss); + } + if (typeof db_instance !=="object") { + mss = `WARNING: Failed to sync data by vector clock as invalid DB instance was encountered.`; + showMessage(mss); + throw new Error(mss); + } + let myOwnDBData = await db_instance.backup_readDB(dbTable, data.id); + if (typeof myOwnDBData.vectorClock !== "number") { + mss = `WARNING: Failed to sync data by vector clock as id field could not be found.`; + showMessage(mss); + throw new Error(mss); + } + if (data.vectorClock < myOwnDBData.vectorClock) { + // You have the latest data, send it to other supernodes + + let getNextClosestSuObj = await localbitcoinplusplus.kademlia + .determineClosestSupernode(req_dt.leaving_supernode_flo_id, 3); + let nextBackupSupernode = getNextClosestSuObj[1].data.id; + + if (typeof nextBackupSupernode !== "string") { + let msg = `WARNING: Failed to determine next closest backup supernode for ${req_dt.leaving_supernode_flo_id}.`; + showMessage(msg); + throw new Error(msg); + } + + getNextClosestSuObj.map((nextSu, i)=>{ + if(nextSu.data.id !==localbitcoinplusplus.wallets.my_local_flo_address) { + let nextSuConn = localbitcoinplusplus.backupWS[nextSu.data.id]; + if(typeof nextSuConn !== "object") { + let msg = `WARNING: Failed to open a backup WS connection with Supernode ${nextSu}.`; + showMessage(msg); + throw new Error(msg); + } + let server_response = RM_RPC + .send_rpc + .call(this, "sync_data_by_vector_clock", { + trader_flo_address: data.trader_flo_address, + receiver_flo_address: nextSu.data.id, + leaving_supernode_flo_id: req_dt.leaving_supernode_flo_id, + data: myOwnDBData, + dbTable: dbTable + }); + doSend(server_response, nextSu.data.id); + } + + }) + + } else if(data.vectorClock > myOwnDBData.vectorClock) { + // You have old data, update respective DB. + db_instance.backup_updateinDB(dbTable, data).then(()=>{ + showMessage(`INFO: Data updated in ${dbTable} for id ${data.id}.`); + }); + } + + })() + } catch (error) { + throw new Error(error); + } + + } + } + break; + default: break; } @@ -17838,11 +17904,23 @@ return msg; } - function doSend(message) { + function doSend(message, user_flo_id="") { - if(websocket.readyState !== 1) { - console.warn("Websocket not ready to broadcast messages."); - //return; + let wsConn = websocket; + if (user_flo_id!=="") { + try { + wsConn = localbitcoinplusplus.backupWS[user_flo_id].ws_connection; + } catch (error) { + showMessage(`ERROR: Failed to determine WS connection with ${user_flo_id}.`); + throw new Error(error); + } + } + + if(wsConn.readyState !== 1) { + let msg = "WARNING: Websocket not ready to broadcast messages."; + showMessage(msg); + console.warn(mag); + return; } const request_array = ['send_back_shamirs_secret_supernode_pvtkey', 'retrieve_shamirs_secret_supernode_pvtkey', @@ -17873,7 +17951,7 @@ } writeToScreen("SENT: " + finalMessage); - websocket.send(finalMessage); + wsConn.send(finalMessage); } function validateIncomingMessage(message) { @@ -18592,16 +18670,17 @@ backup_readDB(tablename, id) { return new Promise((resolve, reject) => { this.transaction = this.db.transaction([tablename]); - var objectStore = transaction.objectStore(tablename); + var objectStore = this.transaction.objectStore(tablename); this.request = objectStore.get(id); + let parent_request = this.request; this.request.onerror = function (event) { reject("Unable to retrieve data from database!"); }; this.request.onsuccess = function (event) { - if (this.request.result) { - resolve(this.request.result); + if (parent_request.result) { + resolve(parent_request.result); } else { resolve(); } @@ -18689,10 +18768,11 @@ return new Promise((resolve, reject) => { this.request = this.db.transaction([tablename], "readwrite") .objectStore(tablename); + let parent_request = this.request; var index = this.request.index(indexName); this.request = index.openCursor(IDBKeyRange.only(indexValue)); this.request.onsuccess = function () { - var cursor = this.request.result; + var cursor = parent_request.result; if (cursor) { cursor.delete(); cursor.continue(); @@ -19601,7 +19681,6 @@ if(localbitcoinplusplus.master_configurations.supernodesPubKeys .includes(op[0].flo_public_key)) { msg = `INFO: Supernode ${getFLOId} left.`; - reactor.dispatchEvent('getNeighborSupernodesItsVectorClockStatusForADeadSupernodeDB', getFLOId); } else { msg = `INFO: User node ${getFLOId} left.`; } @@ -19610,13 +19689,14 @@ }); }); - reactor.registerEvent('getNeighborSupernodesItsVectorClockStatusForADeadSupernodeDB'); - reactor.addEventListener('getNeighborSupernodesItsVectorClockStatusForADeadSupernodeDB', async function(leaving_supernode_flo_id) { - let getNextClosestSuObj = await localbitcoinplusplus.kademlia.determineClosestSupernode(leaving_supernode_flo_id, 3); + reactor.registerEvent('requestSupernodeToActAsBackupServerForRequestingUserNode'); + reactor.addEventListener('requestSupernodeToActAsBackupServerForRequestingUserNode', + async function(params) { + let getNextClosestSuObj = await localbitcoinplusplus.kademlia.determineClosestSupernode(params.leaving_supernode_flo_id, 3); let nextBackupSupernode = getNextClosestSuObj[1].data.id; if (typeof nextBackupSupernode !== "string") { - let msg = `WARNING: Failed to determine next closest backup supernode for ${leaving_supernode_flo_id}.`; + let msg = `WARNING: Failed to determine next closest backup supernode for ${params.leaving_supernode_flo_id}.`; showMessage(msg); throw new Error(msg); } @@ -19626,19 +19706,33 @@ if (nextBackupSupernode == localbitcoinplusplus.wallets.my_local_flo_address) { getNextClosestSuObj.map((nextSu, i)=>{ if((i>0) && (nextSu.data.id !==localbitcoinplusplus.wallets.my_local_flo_address)) { - let nextSuConn = localbitcoinplusplus.backupWS[nextSu.data.id]; + let nextSuConn = localbitcoinplusplus.newBackupDatabase.db[nextSu.data.id]; if(typeof nextSuConn !== "object") { - let msg = `WARNING: Failed to open a backup WS connection with Supernode ${nextSu}.`; + let msg = `WARNING: Failed to open a backup DB with Supernode ${nextSu}.`; showMessage(msg); throw new Error(msg); } - let server_response = RM_RPC - .send_rpc - .call(this, "getNeighborSupernodesItsVectorClockStatusForADeadSupernodeDBReq", { - leaving_supernode_flo_id:leaving_supernode_flo_id + + const table_array = ['cash_balances']; + + table_array.map(async tbl=>{ + let record = await nextSuConn.backup_readDBbyIndex(tbl, 'trader_flo_address', params.requesting_user_id); + + record.map(rec=>{ + let server_response = RM_RPC + .send_rpc + .call(this, "sync_data_by_vector_clock", { + trader_flo_address: params.requesting_user_id, + receiver_flo_address: nextSu.data.id, + leaving_supernode_flo_id: params.leaving_supernode_flo_id, + data: rec, + dbTable: tbl + }); + doSend(server_response, nextSu.data.id); + }); + }); - server_response.globalParams.primarySupernode = leaving_supernode_flo_id; - nextSuConn.ws_connection.send(server_response); + } }); }