data being synced in all neighbour backup db as per vector clock

This commit is contained in:
Abhishek Sinha 2019-04-21 18:48:17 +05:30
parent 891ffe4e6c
commit 172fc1a603

View File

@ -10138,6 +10138,13 @@
}
return arr;
},
get_sharable_db_data_for_single_user: async function (dbTableNamesArray) {
let arr = {};
for (const elem of dbTableNamesArray) {
await readDBbyIndex(elem).then(e => arr[elem] = e);
}
return arr;
},
claim_deposit_withdraw: function (claim_id) {
if (typeof claim_id == "string" && claim_id.length > 0) {
@ -11073,11 +11080,7 @@
let store_pvtkey_req = RM_RPC
.send_rpc
.call(this, "store_shamirs_secret_pvtkey_shares", shares);
localbitcoinplusplus.kademlia.determineClosestSupernode(flo_id)
.then(my_closest_su=>{
store_pvtkey_req.globalParams.primarySupernode = my_closest_su[0].data.id;
return store_pvtkey_req;
}).then((store_pvtkey_req)=>doSend(store_pvtkey_req))
doSend(store_pvtkey_req);
});
return Promise.resolve(true);
@ -11115,6 +11118,22 @@
request.globalParams.receiverFloId = params[0].receiver_flo_address;
}
if (typeof params[0].trader_flo_address !=="string") {
readDB('localbitcoinUser', '00-01').then(result=>{
if (typeof result !=="object" || typeof result.myLocalFLOAddress !=="string") {
let msg = `ERROR: Failed to determine user's local data.`;
showMessage(msg);
throw new Error(msg);
}
localbitcoinplusplus.kademlia
.determineClosestSupernode(result.myLocalFLOAddress)
.then(my_closest_su=>{
request.globalParams.primarySupernode = my_closest_su[0].data.id;
return request.toString();
});
});
}
return request.toString();
},
@ -11511,12 +11530,7 @@
"store_shamirs_secret_pvtkey_shares",
shares
);
localbitcoinplusplus.kademlia.determineClosestSupernode(flo_id)
.then(my_closest_su=>{
store_pvtkey_req.globalParams.primarySupernode = my_closest_su[0].data.id;
return store_pvtkey_req;
}).then(store_pvtkey_req=>doSend(store_pvtkey_req))
doSend(store_pvtkey_req);
});
if (typeof localbitcoinplusplus
@ -11968,12 +11982,7 @@
withdraw_id: vbl.withdraw_id
}
);
localbitcoinplusplus.kademlia
.determineClosestSupernode(localbitcoinplusplus.wallets.my_local_flo_address)
.then(my_closest_su=>{
retrieve_pvtkey_req.globalParams.primarySupernode = my_closest_su[0].data.id;
return retrieve_pvtkey_req;
}).then(retrieve_pvtkey_req=>doSend(retrieve_pvtkey_req));
doSend(retrieve_pvtkey_req);
}
);
});
@ -12779,11 +12788,7 @@
shares
);
localbitcoinplusplus.kademlia.determineClosestSupernode(flo_id)
.then(my_closest_su=>{
store_pvtkey_req.globalParams.primarySupernode = my_closest_su[0].data.id;
return store_pvtkey_req;
}).then(store_pvtkey_req=>doSend(store_pvtkey_req))
doSend(store_pvtkey_req);
});
if (typeof localbitcoinplusplus
@ -13235,12 +13240,7 @@
withdraw_id: vbl.withdraw_id
}
);
localbitcoinplusplus.kademlia
.determineClosestSupernode(localbitcoinplusplus.wallets.my_local_flo_address)
.then(my_closest_su=>{
retrieve_pvtkey_req.globalParams.primarySupernode = my_closest_su[0].data.id;
return retrieve_pvtkey_req;
}).then(retrieve_pvtkey_req=>doSend(retrieve_pvtkey_req));
doSend(retrieve_pvtkey_req);
}
);
});
@ -14506,7 +14506,6 @@
"data_hash": hashed_data,
"supernode_sign": signed_data,
"supernodePubKey": user_data.myLocalFLOPublicKey,
"receiver_flo_address": "BROADCAST_TO_NODES",
}
callback(response_for_client);
return true;
@ -15440,13 +15439,7 @@
.send_rpc
.call(this, "SupernodesKBucketDataResponse", myKBData);
localbitcoinplusplus.kademlia.determineClosestSupernode(sender)
.then(my_closest_su=>{
console.log(sendBackMySupernodeKBucket);
sendBackMySupernodeKBucket.globalParams.primarySupernode = my_closest_su[0].data.id;
return sendBackMySupernodeKBucket;
}).then((sendBackMySupernodeKBucket)=>doSend(sendBackMySupernodeKBucket));
doSend(sendBackMySupernodeKBucket);
})
}
);
@ -15713,6 +15706,12 @@
try {
var res_obj = JSON.parse(res);
if (typeof res_obj.globalParams.receiverFloId=="string"
&& res_obj.globalParams.receiverFloId !==
localbitcoinplusplus.wallets.my_local_flo_address) {
return;
}
const isIncomingMessageValid = await validateIncomingMessage(res);
console.log("isIncomingMessageValid: ", isIncomingMessageValid);
@ -16033,12 +16032,7 @@
.send_rpc
.call(this, "retrieve_shamirs_secret_supernode_pvtkey", "");
}
localbitcoinplusplus.kademlia.determineClosestSupernode(res_obj.globalParams.senderFloId)
.then(my_closest_su=>{
send_pvtkey_req.globalParams.primarySupernode = my_closest_su[0].data.id;
return send_pvtkey_req;
}).then(send_pvtkey_req=>doSend(send_pvtkey_req));
doSend(send_pvtkey_req);
return;
});
@ -16088,11 +16082,7 @@
private_key_chunk: res,
withdraw_id: res_obj.params[0].withdraw_id
});
localbitcoinplusplus.kademlia.determineClosestSupernode(res_obj.globalParams.senderFloId)
.then(my_closest_su=>{
send_pvtkey_req.globalParams.primarySupernode = my_closest_su[0].data.id;
return send_pvtkey_req;
}).then(send_pvtkey_req=>doSend(send_pvtkey_req))
doSend(send_pvtkey_req);
});
}
break;
@ -16662,17 +16652,6 @@
console.log(res_obj);
break;
case "getNeighborSupernodesItsVectorClockStatusForADeadSupernodeDBReq":
if (localbitcoinplusplus.master_configurations.supernodesPubKeys
.inlcudes(localbitcoinplusplus.wallets.my_local_flo_public_key)) {
if (typeof res_obj.params == "object" && typeof res_obj.params[0] == "object") {
let req_dt = res_obj.params[0];
console.log(req_dt);
}
}
break;
default:
break;
}
@ -16711,6 +16690,12 @@
try {
var res_obj = JSON.parse(res);
if (typeof res_obj.globalParams.receiverFloId=="string"
&& res_obj.globalParams.receiverFloId !==
localbitcoinplusplus.wallets.my_local_flo_address) {
return;
}
const isIncomingMessageValid = await validateIncomingMessage(res);
console.log("isIncomingMessageValid: ", isIncomingMessageValid);
@ -17821,6 +17806,87 @@
console.log(res_obj);
break;
case "sync_data_by_vector_clock":
if (localbitcoinplusplus.master_configurations.supernodesPubKeys
.includes(localbitcoinplusplus.wallets.my_local_flo_public_key)) {
if (typeof res_obj.params == "object" && typeof res_obj.params[0] == "object") {
try {
(async function() {
let req_dt = res_obj.params[0];
let db_instance = localbitcoinplusplus.newBackupDatabase.db[req_dt.leaving_supernode_flo_id];
let dbTable = req_dt.dbTable;
let data = req_dt.data;
let subjectUser = data.trader_flo_address;
let mss = '';
if (typeof data.id !== "string" && typeof data.id !== "number") {
mss = `WARNING: Failed to sync data by vector clock as id field could not be found.`;
showMessage(mss);
throw new Error(mss);
}
if (typeof db_instance !=="object") {
mss = `WARNING: Failed to sync data by vector clock as invalid DB instance was encountered.`;
showMessage(mss);
throw new Error(mss);
}
let myOwnDBData = await db_instance.backup_readDB(dbTable, data.id);
if (typeof myOwnDBData.vectorClock !== "number") {
mss = `WARNING: Failed to sync data by vector clock as id field could not be found.`;
showMessage(mss);
throw new Error(mss);
}
if (data.vectorClock < myOwnDBData.vectorClock) {
// You have the latest data, send it to other supernodes
let getNextClosestSuObj = await localbitcoinplusplus.kademlia
.determineClosestSupernode(req_dt.leaving_supernode_flo_id, 3);
let nextBackupSupernode = getNextClosestSuObj[1].data.id;
if (typeof nextBackupSupernode !== "string") {
let msg = `WARNING: Failed to determine next closest backup supernode for ${req_dt.leaving_supernode_flo_id}.`;
showMessage(msg);
throw new Error(msg);
}
getNextClosestSuObj.map((nextSu, i)=>{
if(nextSu.data.id !==localbitcoinplusplus.wallets.my_local_flo_address) {
let nextSuConn = localbitcoinplusplus.backupWS[nextSu.data.id];
if(typeof nextSuConn !== "object") {
let msg = `WARNING: Failed to open a backup WS connection with Supernode ${nextSu}.`;
showMessage(msg);
throw new Error(msg);
}
let server_response = RM_RPC
.send_rpc
.call(this, "sync_data_by_vector_clock", {
trader_flo_address: data.trader_flo_address,
receiver_flo_address: nextSu.data.id,
leaving_supernode_flo_id: req_dt.leaving_supernode_flo_id,
data: myOwnDBData,
dbTable: dbTable
});
doSend(server_response, nextSu.data.id);
}
})
} else if(data.vectorClock > myOwnDBData.vectorClock) {
// You have old data, update respective DB.
db_instance.backup_updateinDB(dbTable, data).then(()=>{
showMessage(`INFO: Data updated in ${dbTable} for id ${data.id}.`);
});
}
})()
} catch (error) {
throw new Error(error);
}
}
}
break;
default:
break;
}
@ -17838,11 +17904,23 @@
return msg;
}
function doSend(message) {
function doSend(message, user_flo_id="") {
if(websocket.readyState !== 1) {
console.warn("Websocket not ready to broadcast messages.");
//return;
let wsConn = websocket;
if (user_flo_id!=="") {
try {
wsConn = localbitcoinplusplus.backupWS[user_flo_id].ws_connection;
} catch (error) {
showMessage(`ERROR: Failed to determine WS connection with ${user_flo_id}.`);
throw new Error(error);
}
}
if(wsConn.readyState !== 1) {
let msg = "WARNING: Websocket not ready to broadcast messages.";
showMessage(msg);
console.warn(mag);
return;
}
const request_array = ['send_back_shamirs_secret_supernode_pvtkey',
'retrieve_shamirs_secret_supernode_pvtkey',
@ -17873,7 +17951,7 @@
}
writeToScreen("SENT: " + finalMessage);
websocket.send(finalMessage);
wsConn.send(finalMessage);
}
function validateIncomingMessage(message) {
@ -18592,16 +18670,17 @@
backup_readDB(tablename, id) {
return new Promise((resolve, reject) => {
this.transaction = this.db.transaction([tablename]);
var objectStore = transaction.objectStore(tablename);
var objectStore = this.transaction.objectStore(tablename);
this.request = objectStore.get(id);
let parent_request = this.request;
this.request.onerror = function (event) {
reject("Unable to retrieve data from database!");
};
this.request.onsuccess = function (event) {
if (this.request.result) {
resolve(this.request.result);
if (parent_request.result) {
resolve(parent_request.result);
} else {
resolve();
}
@ -18689,10 +18768,11 @@
return new Promise((resolve, reject) => {
this.request = this.db.transaction([tablename], "readwrite")
.objectStore(tablename);
let parent_request = this.request;
var index = this.request.index(indexName);
this.request = index.openCursor(IDBKeyRange.only(indexValue));
this.request.onsuccess = function () {
var cursor = this.request.result;
var cursor = parent_request.result;
if (cursor) {
cursor.delete();
cursor.continue();
@ -19601,7 +19681,6 @@
if(localbitcoinplusplus.master_configurations.supernodesPubKeys
.includes(op[0].flo_public_key)) {
msg = `INFO: Supernode ${getFLOId} left.`;
reactor.dispatchEvent('getNeighborSupernodesItsVectorClockStatusForADeadSupernodeDB', getFLOId);
} else {
msg = `INFO: User node ${getFLOId} left.`;
}
@ -19610,13 +19689,14 @@
});
});
reactor.registerEvent('getNeighborSupernodesItsVectorClockStatusForADeadSupernodeDB');
reactor.addEventListener('getNeighborSupernodesItsVectorClockStatusForADeadSupernodeDB', async function(leaving_supernode_flo_id) {
let getNextClosestSuObj = await localbitcoinplusplus.kademlia.determineClosestSupernode(leaving_supernode_flo_id, 3);
reactor.registerEvent('requestSupernodeToActAsBackupServerForRequestingUserNode');
reactor.addEventListener('requestSupernodeToActAsBackupServerForRequestingUserNode',
async function(params) {
let getNextClosestSuObj = await localbitcoinplusplus.kademlia.determineClosestSupernode(params.leaving_supernode_flo_id, 3);
let nextBackupSupernode = getNextClosestSuObj[1].data.id;
if (typeof nextBackupSupernode !== "string") {
let msg = `WARNING: Failed to determine next closest backup supernode for ${leaving_supernode_flo_id}.`;
let msg = `WARNING: Failed to determine next closest backup supernode for ${params.leaving_supernode_flo_id}.`;
showMessage(msg);
throw new Error(msg);
}
@ -19626,19 +19706,33 @@
if (nextBackupSupernode == localbitcoinplusplus.wallets.my_local_flo_address) {
getNextClosestSuObj.map((nextSu, i)=>{
if((i>0) && (nextSu.data.id !==localbitcoinplusplus.wallets.my_local_flo_address)) {
let nextSuConn = localbitcoinplusplus.backupWS[nextSu.data.id];
let nextSuConn = localbitcoinplusplus.newBackupDatabase.db[nextSu.data.id];
if(typeof nextSuConn !== "object") {
let msg = `WARNING: Failed to open a backup WS connection with Supernode ${nextSu}.`;
let msg = `WARNING: Failed to open a backup DB with Supernode ${nextSu}.`;
showMessage(msg);
throw new Error(msg);
}
let server_response = RM_RPC
.send_rpc
.call(this, "getNeighborSupernodesItsVectorClockStatusForADeadSupernodeDBReq", {
leaving_supernode_flo_id:leaving_supernode_flo_id
const table_array = ['cash_balances'];
table_array.map(async tbl=>{
let record = await nextSuConn.backup_readDBbyIndex(tbl, 'trader_flo_address', params.requesting_user_id);
record.map(rec=>{
let server_response = RM_RPC
.send_rpc
.call(this, "sync_data_by_vector_clock", {
trader_flo_address: params.requesting_user_id,
receiver_flo_address: nextSu.data.id,
leaving_supernode_flo_id: params.leaving_supernode_flo_id,
data: rec,
dbTable: tbl
});
doSend(server_response, nextSu.data.id);
});
});
server_response.globalParams.primarySupernode = leaving_supernode_flo_id;
nextSuConn.ws_connection.send(server_response);
}
});
}