fixed primary and backup sync issues

This commit is contained in:
Abhishek Sinha 2019-07-08 19:36:09 +05:30
parent 42fb7a6830
commit 449f8b961b

View File

@ -10164,19 +10164,7 @@
sync_primary_supernode_from_backup_supernode: async function (primary_su="", backup_su="") {
const RM_RPC = new localbitcoinplusplus.rpc;
// RM_RPC.send_rpc.call(this,
// "sync_primary_supernode_from_backup_supernode", {
// "trader_flo_address": primary_su,
// "job": "SYNC_PRIMARY_SUPERNODE_DB_WITH_BACKUP_SUPERNODE_DB",
// "receiver_flo_address": backup_su,
// }).then(sync_request=>{
// if (typeof localbitcoinplusplus.backupWS[backup_su]=="object") {
// doSend(sync_request, backup_su);
// } else {
// doSend(sync_request);
// }
// });
// First check if you yourself have the right data to serve
// If not, either get the data or don't serve the users of
// that dead supernode.
@ -10201,20 +10189,7 @@
sync_backup_supernode_from_backup_supernode: async function (requester="", receiversList="", flo_addr_of_backup="") {
const RM_RPC = new localbitcoinplusplus.rpc;
// RM_RPC.send_rpc.call(this,
// "sync_backup_supernode_from_backup_supernode", {
// "trader_flo_address": flo_addr_of_backup,
// "job": "SYNC_BACKUP_SUPERNODE_DB_WITH_BACKUP_SUPERNODE_DB",
// "receiver_flo_address": receiver,
// "requester_flo_id": requester
// }).then(sync_request=>{
// if (typeof localbitcoinplusplus.backupWS[receiver]=="object") {
// doSend(sync_request, receiver);
// } else {
// doSend(sync_request);
// }
// });
// First check if you yourself have the right data to serve
// If not, either get the data or don't serve the users of
// that dead supernode.
@ -12725,63 +12700,6 @@
return;
}
if (method=="sync_primary_supernode_from_backup_supernode") {
// params.trader_flo_address -> primary supernode flo id
RM_RPC.filter_legit_backup_requests(params.trader_flo_address, function (is_valid_request) {
if (is_valid_request === true && params.job ==
"SYNC_PRIMARY_SUPERNODE_DB_WITH_BACKUP_SUPERNODE_DB" && params.trader_flo_address.length >
0) {
const tableArray = ["deposit", "withdraw_cash", "withdraw_btc", "cash_balances", "crypto_balances",
"system_btc_reserves_private_keys", "buyOrders", "sellOrders"];
localbitcoinplusplus.actions.get_sharable_db_data(tableArray, params.trader_flo_address).then(
function (su_db_data) {
if (typeof su_db_data == "object") {
su_db_data.trader_flo_address = params.trader_flo_address;
su_db_data.receiver_flo_address = params.trader_flo_address;
RM_RPC
.send_rpc
.call(this, "sync_primary_supernode_from_backup_supernode_response",
su_db_data)
.then(server_sync_response=>
doSend(server_sync_response));
}
});
}
});
return;
}
if (method=="sync_backup_supernode_from_backup_supernode") {
// params.trader_flo_address -> primary supernode flo id
RM_RPC.filter_legit_backup_requests(params.trader_flo_address, function (is_valid_request) {
if (is_valid_request === true && params.job ==
"SYNC_BACKUP_SUPERNODE_DB_WITH_BACKUP_SUPERNODE_DB" && params.trader_flo_address.length >
0) {
const tableArray = ["deposit", "withdraw_cash", "withdraw_btc", "cash_balances", "crypto_balances",
"buyOrders", "sellOrders", "system_btc_reserves_private_keys"];
let rec_flo_id = (params.receiver_flo_address==params.trader_flo_address) ? "" : params.trader_flo_address;
localbitcoinplusplus.actions.get_sharable_db_data(tableArray, rec_flo_id).then(
function (su_db_data) {
if (typeof su_db_data == "object") {
su_db_data.trader_flo_address = params.trader_flo_address;
su_db_data.receiver_flo_address = params.requester_flo_id;
RM_RPC
.send_rpc
.call(this, "sync_backup_supernode_from_backup_supernode_response",
su_db_data)
.then(server_sync_response=>
doSend(server_sync_response));
//doSend(server_sync_response, params.requester_flo_id));
}
});
}
});
return;
}
let backup_server_db_instance;
if (method !== "retrieve_shamirs_secret_btc_pvtkey") {
if (typeof localbitcoinplusplus.newBackupDatabase.db[primarySupernodeOfThisUser] == "object") {
@ -15898,14 +15816,24 @@
// Don't serve the request if data is not synced.
if (localbitcoinplusplus.master_configurations.supernodesPubKeys
.includes(localbitcoinplusplus.wallets.my_local_flo_public_key)) {
if (typeof res_obj.globalParams.primarySupernode !== "string"
|| typeof localbitcoinplusplus.services[`can_serve_${res_obj.globalParams.primarySupernode}`] !== "boolean"
|| localbitcoinplusplus.services[`can_serve_${res_obj.globalParams.primarySupernode}`]==false
) {
showMessage(`INFO: You are not authorized to serve this request.`);
return false;
}
let byPassMethods = ['sync_backup_supernode_from_backup_supernode_response',
'sync_primary_supernode_from_backup_supernode_response', 'supernode_message',
'store_shamirs_secret_pvtkey_shares', 'send_back_shamirs_secret_supernode_pvtkey',
'retrieve_shamirs_secret_supernode_pvtkey', 'send_back_shamirs_secret_btc_pvtkey',
'retrieve_shamirs_secret_btc_pvtkey', 'add_user_public_data', 'link_My_Local_IP_To_My_Flo_Id',
'link_Others_Local_IP_To_Their_Flo_Id', 'sync_data_by_vector_clock', 'is_node_alive_request',
'yup_i_am_awake'];
if(!byPassMethods.includes(res_obj.method)) {
if (typeof res_obj.globalParams.primarySupernode !== "string"
|| typeof localbitcoinplusplus.services[`can_serve_${res_obj.globalParams.primarySupernode}`] !== "boolean"
|| localbitcoinplusplus.services[`can_serve_${res_obj.globalParams.primarySupernode}`]==false
) {
showMessage(`INFO: You are not authorized to serve this request.`);
return false;
}
}
}
if (typeof res_obj.method !== "undefined") {
@ -16904,12 +16832,11 @@
try {
let obj = su_db_data[tableStoreName];
if (["crypto_balances", "cash_balances", "userPublicData"].includes(
tableStoreName)) {
if (["crypto_balances", "cash_balances", "userPublicData", "system_btc_reserves_private_keys"]
.includes(tableStoreName)) {
if (obj.length > 0) {
for (var prop in obj) {
if (!obj.hasOwnProperty(prop)) continue;
//if(obj[prop].)
await updateinDB(tableStoreName, obj[prop], obj[prop].id, true, false)
.then(()=>{
showMessage(`INFO: "${tableStoreName}" datastore syncing is complete.`);
@ -16922,7 +16849,7 @@
if (obj.length > 0) {
for (var prop in obj) {
if (!obj.hasOwnProperty(prop)) continue;
await addDB(resdbdata, obj[prop]).then(()=>{
await updateinDB(resdbdata, obj[prop], obj[prop].id, true, false).then(()=>{
showMessage(`INFO: "${resdbdata}" datastore syncing is complete.`);
});
}
@ -16931,22 +16858,15 @@
}
if (i==Object.keys(su_db_data).length-2) {
// Get data for crypto and fiat balances based on vector clock from all backup supernodes
reactor.dispatchEvent('primarySupernodeUpdatingLatestDataForItsUserFromOtherSupernodes',
{ requesting_user_id: localbitcoinplusplus.wallets.my_local_flo_address});
await localbitcoinplusplus.actions.delay(180000).then(()=>{
showMessage(`INFO: Balance syncing is complete.`);
localbitcoinplusplus.services[`can_serve_${su_db_data.trader_flo_address}`] = true;
localbitcoinplusplus.services[`can_serve_${su_db_data.trader_flo_address}`] = true;
const RM_RPC = new localbitcoinplusplus.rpc;
RM_RPC
.send_rpc
.call(this, "supernode_message", {
"trader_flo_address": su_db_data.trader_flo_address,
"server_msg": `System is synced and ready to serve.`
}).then(server_response=>doSend(server_response));
});
const RM_RPC = new localbitcoinplusplus.rpc;
RM_RPC
.send_rpc
.call(this, "supernode_message", {
"trader_flo_address": su_db_data.trader_flo_address,
"server_msg": `System is synced and ready to serve.`
}).then(server_response=>doSend(server_response));
}
} catch (error) {
@ -16960,11 +16880,14 @@
case "sync_backup_supernode_from_backup_supernode_response":
let su_db_data = res_obj.params[0];
// if (typeof localbitcoinplusplus.wallets.my_local_flo_address !== "string" ||
// su_db_data.trader_flo_address !== localbitcoinplusplus.wallets.my_local_flo_address
if (typeof localbitcoinplusplus.wallets.my_local_flo_address !== "string"
|| typeof su_db_data.receiver_flo_address !== "string"
|| su_db_data.receiver_flo_address !== localbitcoinplusplus.wallets.my_local_flo_address){
console.warn(`WARNING: This backup response data was not meant for you.`);
return;
}
if (typeof res_obj.params == "object" && typeof res_obj.params[0] == "object") {
// ) return false;
(async function () {
let _addDB = addDB;
let _removeAllinDB = removeAllinDB;
@ -16986,6 +16909,7 @@
_readDB = foreign_db.backup_readDB.bind(foreign_db);
_readDBbyIndex = foreign_db.backup_readDBbyIndex.bind(foreign_db);
_updateinDB = foreign_db.backup_updateinDB.bind(foreign_db);
_removeAllinDB = foreign_db.backup_removeAllinDB.bind(foreign_db);
} else {
err_msg = `WARNING: Invalid Backup DB Instance Id: ${backup_db}.`;
showMessage(err_msg);
@ -16996,15 +16920,7 @@
for (let tableStoreName in su_db_data) {
i++;
if (i==Object.keys(su_db_data).length-2) {
// Get data for crypto and fiat balances based on vector clock from all backup supernodes
// reactor.dispatchEvent('primarySupernodeUpdatingLatestDataForItsUserFromOtherSupernodes',
// { requesting_user_id: localbitcoinplusplus.wallets.my_local_flo_address});
// await localbitcoinplusplus.actions.delay(180000).then(()=>{
// showMessage(`INFO: Balance syncing is complete.`);
localbitcoinplusplus.services[`can_serve_${su_db_data[`trader_flo_address`]}`] = true;
//});
localbitcoinplusplus.services[`can_serve_${su_db_data[`trader_flo_address`]}`] = true;
}
// skip loop if the property is from prototype
if (tableStoreName == 'trader_flo_address'
@ -17013,8 +16929,8 @@
try {
let obj = su_db_data[tableStoreName];
if (["crypto_balances", "cash_balances", "userPublicData"].includes(
tableStoreName)) {
if (["crypto_balances", "cash_balances", "userPublicData", "system_btc_reserves_private_keys"]
.includes(tableStoreName)) {
if (obj.length > 0) {
for (var prop in obj) {
if (!obj.hasOwnProperty(prop)) continue;
@ -17025,12 +16941,12 @@
}
}
} else {
let resdbdata = await _removeAllinDB(tableStoreName);
let resdbdata = await _removeAllinDB(tableStoreName); // returns tableStoreName or false
if (resdbdata !== false) {
if (obj.length > 0) {
for (var prop in obj) {
if (!obj.hasOwnProperty(prop)) continue;
_addDB(resdbdata, obj[prop]).then(()=>{
_updateinDB(resdbdata, obj[prop], obj[prop].id, true, false).then(()=>{
showMessage(`INFO: "${resdbdata}" datastore syncing is complete.`);
});
}
@ -17048,7 +16964,7 @@
case "reconnect_with_another_supernode":
if (typeof res_obj.params == "object" && typeof res_obj.params[0] == "object"
&& localbitcoinplusplus.master_configurations.supernodesPubKeys
&& !localbitcoinplusplus.master_configurations.supernodesPubKeys
.includes(res_obj.nodePubKey)
) {
(async function() {
@ -17109,18 +17025,6 @@
}
break;
case "sync_backup_supernode_from_backup_supernode":
if (localbitcoinplusplus.master_configurations.supernodesPubKeys
.includes(localbitcoinplusplus.wallets.my_local_flo_public_key)
&& typeof res_obj.globalParams.receiverFloId == "string"
&& localbitcoinplusplus.wallets.my_local_flo_address == res_obj.globalParams.receiverFloId
) {
const RM_RPC = new localbitcoinplusplus.rpc;
response_from_sever = RM_RPC.backup_receive_rpc_response.call(this,
JSON.stringify(res_obj));
}
break;
default:
break;
}
@ -17209,12 +17113,22 @@
if (localbitcoinplusplus.master_configurations.supernodesPubKeys
.includes(localbitcoinplusplus.wallets.my_local_flo_public_key)) {
if (typeof res_obj.globalParams.primarySupernode !== "string"
|| typeof localbitcoinplusplus.services[`can_serve_${res_obj.globalParams.primarySupernode}`] !== "boolean"
|| localbitcoinplusplus.services[`can_serve_${res_obj.globalParams.primarySupernode}`]==false
) {
showMessage(`INFO: You are not authorized to serve this request.`);
return false;
let byPassMethods = ['sync_backup_supernode_from_backup_supernode_response',
'sync_primary_supernode_from_backup_supernode_response', 'supernode_message',
'store_shamirs_secret_pvtkey_shares', 'send_back_shamirs_secret_supernode_pvtkey',
'retrieve_shamirs_secret_supernode_pvtkey', 'send_back_shamirs_secret_btc_pvtkey',
'retrieve_shamirs_secret_btc_pvtkey', 'add_user_public_data', 'link_My_Local_IP_To_My_Flo_Id',
'link_Others_Local_IP_To_Their_Flo_Id', 'sync_data_by_vector_clock', 'is_node_alive_request',
'yup_i_am_awake'];
if(!byPassMethods.includes(res_obj.method)) {
if (typeof res_obj.globalParams.primarySupernode !== "string"
|| typeof localbitcoinplusplus.services[`can_serve_${res_obj.globalParams.primarySupernode}`] !== "boolean"
|| localbitcoinplusplus.services[`can_serve_${res_obj.globalParams.primarySupernode}`]==false
) {
showMessage(`INFO: You are not authorized to serve this request.`);
return false;
}
}
}
@ -18480,25 +18394,6 @@
}
break;
case "sync_primary_supernode_from_backup_supernode":
if (localbitcoinplusplus.master_configurations.supernodesPubKeys
.includes(localbitcoinplusplus.wallets.my_local_flo_public_key)) {
response_from_sever = RM_RPC.backup_receive_rpc_response.call(this,
JSON.stringify(res_obj));
}
break;
case "sync_backup_supernode_from_backup_supernode":
if (localbitcoinplusplus.master_configurations.supernodesPubKeys
.includes(localbitcoinplusplus.wallets.my_local_flo_public_key)
&& typeof res_obj.globalParams.receiverFloId == "string"
&& localbitcoinplusplus.wallets.my_local_flo_address == res_obj.globalParams.receiverFloId
) {
response_from_sever = RM_RPC.backup_receive_rpc_response.call(this,
JSON.stringify(res_obj));
}
break;
case "is_node_alive_request":
if(localbitcoinplusplus.master_configurations.supernodesPubKeys
.includes(localbitcoinplusplus.wallets.my_local_flo_public_key)) {
@ -19421,11 +19316,14 @@
case "sync_backup_supernode_from_backup_supernode_response":
let su_db_data = res_obj.params[0];
// if (typeof localbitcoinplusplus.wallets.my_local_flo_address !== "string" ||
// su_db_data.trader_flo_address !== localbitcoinplusplus.wallets.my_local_flo_address
if (typeof localbitcoinplusplus.wallets.my_local_flo_address !== "string"
|| typeof su_db_data.receiver_flo_address !== "string"
|| su_db_data.receiver_flo_address !== localbitcoinplusplus.wallets.my_local_flo_address){
console.warn(`WARNING: This backup response data was not meant for you.`);
return;
}
if (typeof res_obj.params == "object" && typeof res_obj.params[0] == "object") {
// ) return false;
(async function () {
let _addDB = addDB;
let _removeAllinDB = removeAllinDB;
@ -19447,6 +19345,7 @@
_readDB = foreign_db.backup_readDB.bind(foreign_db);
_readDBbyIndex = foreign_db.backup_readDBbyIndex.bind(foreign_db);
_updateinDB = foreign_db.backup_updateinDB.bind(foreign_db);
_removeAllinDB = foreign_db.backup_removeAllinDB.bind(foreign_db);
} else {
err_msg = `WARNING: Invalid Backup DB Instance Id: ${backup_db}.`;
showMessage(err_msg);
@ -19457,15 +19356,7 @@
for (let tableStoreName in su_db_data) {
i++;
if (i==Object.keys(su_db_data).length-2) {
// Get data for crypto and fiat balances based on vector clock from all backup supernodes
// reactor.dispatchEvent('primarySupernodeUpdatingLatestDataForItsUserFromOtherSupernodes',
// { requesting_user_id: localbitcoinplusplus.wallets.my_local_flo_address});
// await localbitcoinplusplus.actions.delay(180000).then(()=>{
// showMessage(`INFO: Balance syncing is complete.`);
localbitcoinplusplus.services[`can_serve_${su_db_data.trader_flo_address}`] = true;
//});
localbitcoinplusplus.services[`can_serve_${su_db_data.trader_flo_address}`] = true;
}
// skip loop if the property is from prototype
if (tableStoreName == 'trader_flo_address'
@ -19474,8 +19365,8 @@
try {
let obj = su_db_data[tableStoreName];
if (["crypto_balances", "cash_balances", "userPublicData"].includes(
tableStoreName)) {
if (["crypto_balances", "cash_balances", "userPublicData", "system_btc_reserves_private_keys"]
.includes(tableStoreName)) {
if (obj.length > 0) {
for (var prop in obj) {
if (!obj.hasOwnProperty(prop)) continue;
@ -19491,7 +19382,7 @@
if (obj.length > 0) {
for (var prop in obj) {
if (!obj.hasOwnProperty(prop)) continue;
_addDB(resdbdata, obj[prop]).then(()=>{
_updateinDB(resdbdata, obj[prop], obj[prop].id, true, false).then(()=>{
showMessage(`INFO: "${resdbdata}" datastore syncing is complete.`);
});
}
@ -21691,7 +21582,6 @@
reactor.registerEvent('backup_supernode_down');
reactor.registerEvent('fireNodeWelcomeBackEvent');
reactor.registerEvent('fireNodeGoodByeEvent');
reactor.registerEvent('primarySupernodeUpdatingLatestDataForItsUserFromOtherSupernodes');
reactor.registerEvent('nodeIsAlive');
reactor.registerEvent('get_node_status_request');
reactor.registerEvent('sync_primary_and_backup_db');
@ -21840,7 +21730,6 @@
break;
}
}
}
msg = `INFO: Supernode ${getFLOId} left.`;
@ -21852,62 +21741,6 @@
});
});
reactor.addEventListener('primarySupernodeUpdatingLatestDataForItsUserFromOtherSupernodes', async function(params) {
let msg = '';
if (typeof params.requesting_user_id !== "string") {
msg = 'ERROR: Invalid User id provided in "primarySupernodeUpdatingLatestDataForItsUserFromOtherSupernodes" request.';
showMessage(msg);
throw new Error(msg);
}
let getPrimarySuObj = await localbitcoinplusplus.kademlia.determineClosestSupernode(params.requesting_user_id, 3);
let primarySupernode = getPrimarySuObj[0].data.id;
if (typeof primarySupernode !== "string") {
let msg = `WARNING: Failed to determine primary supernode for ${params.requesting_user_id}.`;
showMessage(msg);
throw new Error(msg);
}
let getNextClosestSuObjOfPrimarySupernode = await localbitcoinplusplus.kademlia.determineClosestSupernode("", 3, supernodeKBucket, primarySupernode);
if (localbitcoinplusplus.wallets.my_local_flo_address !== primarySupernode) return;
const RM_RPC = new localbitcoinplusplus.rpc;
const table_array = ["crypto_balances", "cash_balances"];
table_array.map(async tbl=>{
let record = await readDBbyIndex(tbl, 'trader_flo_address', params.requesting_user_id);
record.map(rec=>{
getNextClosestSuObjOfPrimarySupernode.map(nextSu=>{
if (nextSu.data.id !== primarySupernode) {
let nextSuConn = localbitcoinplusplus.newBackupDatabase.db[nextSu.data.id];
if(typeof nextSuConn !== "object") {
let msg = `WARNING: Failed to open a backup DB with Supernode ${nextSu}.`;
showMessage(msg);
throw new Error(msg);
}
RM_RPC
.send_rpc
.call(this, "sync_data_by_vector_clock", {
trader_flo_address: params.requesting_user_id,
receiver_flo_address: nextSu.data.id,
leaving_supernode_flo_id: primarySupernode,
data: rec,
dbTable: tbl
}).then(server_response=>doSend(server_response, nextSu.data.id));
}
});
});
});
});
reactor.addEventListener('nodeIsAlive', function(res_obj) {
try {
if (res_obj.params[0].JOB !== "ARE_YOU_ALIVE"