added code to sync primary or backup data based on hash, timestamp and vector clock filter

This commit is contained in:
Abhishek Sinha 2019-06-27 16:39:32 +05:30
parent da9c50cbfc
commit f251be29f0

View File

@ -9635,8 +9635,8 @@
btc_mainnet: "https://blockexplorer.com",
btc_testnet: "https://testnet.blockexplorer.com",
flo_mainnet: "http://flosight.duckdns.org",
flo_testnet: "http://testnet-flosight.duckdns.org"
//flo_testnet: "https://testnet.flocha.in"
//flo_testnet: "http://testnet-flosight.duckdns.org"
flo_testnet: "https://testnet.flocha.in"
},
writable: false,
configurable: false,
@ -10161,35 +10161,77 @@
sync_primary_supernode_from_backup_supernode: function (primary_su="", backup_su="") {
const RM_RPC = new localbitcoinplusplus.rpc;
RM_RPC.send_rpc.call(this,
"sync_primary_supernode_from_backup_supernode", {
"trader_flo_address": primary_su,
"job": "SYNC_PRIMARY_SUPERNODE_DB_WITH_BACKUP_SUPERNODE_DB",
"receiver_flo_address": backup_su,
}).then(sync_request=>{
if (typeof localbitcoinplusplus.backupWS[backup_su]=="object") {
doSend(sync_request, backup_su);
} else {
doSend(sync_request);
}
});
// RM_RPC.send_rpc.call(this,
// "sync_primary_supernode_from_backup_supernode", {
// "trader_flo_address": primary_su,
// "job": "SYNC_PRIMARY_SUPERNODE_DB_WITH_BACKUP_SUPERNODE_DB",
// "receiver_flo_address": backup_su,
// }).then(sync_request=>{
// if (typeof localbitcoinplusplus.backupWS[backup_su]=="object") {
// doSend(sync_request, backup_su);
// } else {
// doSend(sync_request);
// }
// });
// First check if you yourself have the right data to serve
// If not, either get the data or don't serve the users of
// that dead supernode.
const tableArray = ["deposit", "withdraw_cash", "withdraw_btc", "cash_balances",
"crypto_balances", "buyOrders", "sellOrders", "system_btc_reserves_private_keys"];
const su_db_data = await localbitcoinplusplus.actions.get_sharable_db_data(tableArray, primary_su);
const dbHashData = await localbitcoinplusplus.actions.getDBTablesLatestHashAndTimestamp(primary_su, su_db_data);
// Now you have db tables timestamp and tables hashes. Send it to other supernodes to check
// if you have the latest data. If you don't have the latest data, someone
// will send you the latest data which you can verify before updating.
RM_RPC
.send_rpc
.call(this, "do_you_have_latest_data_for_this_supernode", dbHashData)
.then(server_sync_response=>doSend(server_sync_response));
},
sync_backup_supernode_from_backup_supernode: function (requester="", receiver="", flo_addr_of_backup="") {
const RM_RPC = new localbitcoinplusplus.rpc;
RM_RPC.send_rpc.call(this,
"sync_backup_supernode_from_backup_supernode", {
"trader_flo_address": flo_addr_of_backup,
"job": "SYNC_BACKUP_SUPERNODE_DB_WITH_BACKUP_SUPERNODE_DB",
"receiver_flo_address": receiver,
"requester_flo_id": requester
}).then(sync_request=>{
if (typeof localbitcoinplusplus.backupWS[receiver]=="object") {
doSend(sync_request, receiver);
} else {
doSend(sync_request);
}
});
// RM_RPC.send_rpc.call(this,
// "sync_backup_supernode_from_backup_supernode", {
// "trader_flo_address": flo_addr_of_backup,
// "job": "SYNC_BACKUP_SUPERNODE_DB_WITH_BACKUP_SUPERNODE_DB",
// "receiver_flo_address": receiver,
// "requester_flo_id": requester
// }).then(sync_request=>{
// if (typeof localbitcoinplusplus.backupWS[receiver]=="object") {
// doSend(sync_request, receiver);
// } else {
// doSend(sync_request);
// }
// });
// First check if you yourself have the right data to serve
// If not, either get the data or don't serve the users of
// that dead supernode.
const tableArray = ["deposit", "withdraw_cash", "withdraw_btc", "cash_balances",
"crypto_balances", "buyOrders", "sellOrders", "system_btc_reserves_private_keys"];
const su_db_data = await localbitcoinplusplus.actions.get_sharable_db_data(tableArray, flo_addr_of_backup);
const dbHashData = await localbitcoinplusplus.actions.getDBTablesLatestHashAndTimestamp(flo_addr_of_backup, su_db_data);
// Now you have db tables timestamp and tables hashes. Send it to other supernodes to check
// if you have the latest data. If you don't have the latest data, someone
// will send you the latest data which you can verify before updating.
RM_RPC
.send_rpc
.call(this, "do_you_have_latest_data_for_this_supernode", dbHashData)
.then(server_sync_response=>doSend(server_sync_response));
},
get_sharable_db_data: async function (dbTableNamesArray, backup_db="") {
@ -10280,7 +10322,7 @@
const subjectDB = (supernode_flo_id==localbitcoinplusplus.wallets.my_local_flo_address) ? "":supernode_flo_id;
let dbDataOfSupernode;
let dbDataOfSupernode=actual_db_data;
if (actual_db_data==false) {
if (typeof tableArray!=="object") {
@ -10304,21 +10346,13 @@
const dbDataOfSupernodeStr = JSON.stringify(dbDataOfSupernode);
const dbDataOfSupernodeHash = Crypto.SHA256(dbDataOfSupernodeStr);
myArray["id"] = `SU_DB_${su}`;
myArray["id"] = `SU_DB_${supernode_flo_id}`;
myArray["DBHash"] = dbDataOfSupernodeHash;
myArray["trader_flo_address"] = localbitcoinplusplus.wallets.my_local_flo_address;
myArray["data_of"] = su;
myArray["trader_flo_address"] = supernode_flo_id;
myArray["data_of"] = supernode_flo_id;
myArray["higestTimestampList"] = {...higestTimestampList};
return myArray;
// const RM_RPC = new localbitcoinplusplus.rpc;
// RM_RPC
// .send_rpc
// .call(this, "do_you_have_latest_data_for_this_supernode", myArray)
// .then(supernodesDbHash_response=>
// doSend(supernodesDbHash_response));
}
return false;
@ -16689,8 +16723,7 @@
}
if(data.vectorClock > myOwnDBData.vectorClock) {
// You have old data, update respective DB.
data.increaseVectorClock = false;
updateinDB(dbTable, data).then(()=>{
updateinDB(dbTable, data, data.id, false, false).then(()=>{
showMessage(`INFO: Data updated in ${dbTable} for id ${data.id}.`);
});
}
@ -16730,7 +16763,7 @@
for (var prop in obj) {
if (!obj.hasOwnProperty(prop)) continue;
//if(obj[prop].)
await updateinDB(tableStoreName, obj[prop], obj[prop].id, true)
await updateinDB(tableStoreName, obj[prop], obj[prop].id, true, false)
.then(()=>{
showMessage(`INFO: "${tableStoreName}" datastore syncing is complete.`);
});
@ -16819,7 +16852,7 @@
for (var prop in obj) {
if (!obj.hasOwnProperty(prop)) continue;
await localbitcoinplusplus.newBackupDatabase.db[su_db_data.trader_flo_address]
.backup_updateinDB(tableStoreName, obj[prop], obj[prop].id, true)
.backup_updateinDB(tableStoreName, obj[prop], obj[prop].id, true, false)
.then(()=>{
showMessage(`INFO: "${tableStoreName}" datastore syncing is complete.`);
});
@ -19144,7 +19177,7 @@
res_obj.nodePubKey
);
if (isBalanceLegit) {
backup_server_db_instance.backup_updateinDB("deposits",
backup_server_db_instance.backup_updateinDB("deposit",
updateUserDepositsResponseObject.updatedTraderDepositObject);
if (localbitcoinplusplus.wallets.my_local_flo_address ==
updateUserDepositsResponseObject.trader_flo_address) {
@ -19235,7 +19268,7 @@
if (!obj.hasOwnProperty(prop)) continue;
await localbitcoinplusplus.newBackupDatabase.db[su_db_data.trader_flo_address]
.backup_updateinDB(tableStoreName, obj[prop], obj[prop]
.id, true).then(()=>{
.id, true, false).then(()=>{
showMessage(`INFO: "${tableStoreName}" datastore syncing is complete.`);
});
}
@ -19273,14 +19306,21 @@
localbitcoinplusplus.kademlia.determineClosestSupernode(res_obj.params[0].trader_flo_address)
.then(async my_closest_su_list=>{
const primarySupernodeOfThisUser = my_closest_su_list[0].data.id;
const backup_server_db_instance = localbitcoinplusplus.newBackupDatabase.db[primarySupernodeOfThisUser];
if(typeof backup_server_db_instance !== "object"
|| backup_server_db_instance==localbitcoinplusplus.wallets.my_local_flo_address) {
let backup_db_error_msg = `WARNING: Unknown DB instance. DB Backup failed.`;
showMessage(backup_db_error_msg);
throw new Error(backup_db_error_msg);
};
let _readAllDB = readAllDB;
if (typeof primarySupernodeOfThisUser=="string"
&& primarySupernodeOfThisUser.length>0
&& primarySupernodeOfThisUser !== localbitcoinplusplus.wallets.my_local_flo_address
) {
if (typeof localbitcoinplusplus.newBackupDatabase.db[primarySupernodeOfThisUser] == "object") {
const foreign_db = localbitcoinplusplus.newBackupDatabase.db[primarySupernodeOfThisUser];
_readAllDB = foreign_db.backup_readAllDB.bind(foreign_db);
} else {
err_msg = `WARNING: Invalid Backup DB Instance Id: ${primarySupernodeOfThisUser}.`;
showMessage(err_msg);
throw new Error(err_msg);
}
}
console.log(response_object);
@ -19293,18 +19333,18 @@
// If you have same data as the sender has, you don't need to return any data to him
if (dbHashData_from_my_db.DBHash===response_object.DBHash) return;
if (dbHashData_from_my_db.id===response_object.id) return;
if (dbHashData_from_my_db.data_of===response_object.data_of) return;
if (dbHashData_from_my_db.trader_flo_address===response_object.trader_flo_address) return;
if (dbHashData_from_my_db.id!=response_object.id) return;
if (dbHashData_from_my_db.data_of!=response_object.data_of) return;
let mismatched_fields = [];
for (var q in dbHashData_from_my_db) {
if (dbHashData_from_my_db.hasOwnProperty(q)
|| q=='higestTimestampList'
|| q=='id'
|| q=='data_of'
|| q=='trader_flo_address') {
&& q!='higestTimestampList'
&& q!='id'
&& q!='data_of'
&& q!="DBHash"
&& q!='trader_flo_address') {
if (dbHashData_from_my_db[q]!==response_object[q]) {
mismatched_fields.push(q);
}
@ -19315,24 +19355,37 @@
let latest_data = {};
mismatched_fields.map(mf=>{
backup_server_db_instance.backup_readAllDB(mf)
.then(res_data_obj=>{
let filtered_data = res_data_obj.filter(odho=>{
if (typeof odho.vectorClock=="number"
&& typeof response_object.higestTimestampList[`${mf}_HVC`] !=='undefined') {
return odho.vectorClock >= response_object.higestTimestampList[`${mf}_HVC`];
}
});
latest_data[mf] = {...filtered_data};
mismatched_fields.map(async mf=>{
const res_data_obj = await _readAllDB(mf)
let filtered_data = res_data_obj.filter(odho=>{
if (typeof odho.timestamp=="number"
&& typeof response_object.higestTimestampList[`${mf}_HVC`] !=='undefined') {
return odho.timestamp >= response_object.higestTimestampList[`${mf}_HVC`];
}
});
latest_data[mf] = filtered_data;
});
// Send the data back to sender
console.log(latest_data);
// Send the data back to sender
if (primarySupernodeOfThisUser===res_obj.globalParams.senderFloId) {
latest_data.trader_flo_address = primarySupernodeOfThisUser;
latest_data.receiver_flo_address = res_obj.globalParams.senderFloId;
RM_RPC
.send_rpc
.call(this, "sync_primary_supernode_from_backup_supernode_response", latest_data)
.then(server_sync_response=>doSend(server_sync_response));
} else {
latest_data.trader_flo_address = primarySupernodeOfThisUser;
latest_data.receiver_flo_address = res_obj.globalParams.senderFloId;
RM_RPC
.send_rpc
.call(this, "sync_backup_supernode_from_backup_supernode_response", latest_data)
.then(server_sync_response=>doSend(server_sync_response));
}
});
}
@ -19937,6 +19990,7 @@
async function addDB(tablename, dbObject) {
try {
if(typeof dbObject.vectorClock == "undefined") dbObject.vectorClock = 0;
dbObject.timestamp = + new Date();
let request = db.transaction([tablename], "readwrite")
let store = request.objectStore(tablename)
await store.add(dbObject);
@ -19948,17 +20002,18 @@
}
}
async function updateinDB(tablename, Obj, key, updateByVectorClock=false) {
async function updateinDB(tablename, Obj, key, updateByVectorClock=false, increaseVectorClock=true) {
// updateByVectorClock==true will not return Obj back.
// Return value will be undefined
try {
if(typeof Obj.vectorClock == "undefined") {
Obj.vectorClock = 0;
} else if(typeof Obj.increaseVectorClock == "boolean"
&& Obj.increaseVectorClock === false) {
Obj.timestamp = + new Date();
} else if(increaseVectorClock === false) {
// leave the vector clock field unchanged
} else {
Obj.vectorClock += 1;
Obj.timestamp = + new Date();
}
var request = db.transaction([tablename], "readwrite")
let store = request.objectStore(tablename)
@ -20310,6 +20365,8 @@
async backup_addDB(tablename, dbObject) {
try {
if(typeof dbObject.vectorClock == "undefined") dbObject.vectorClock = 0;
dbObject.timestamp = + new Date();
this.request = this.db.transaction([tablename], "readwrite")
let store = this.request.objectStore(tablename)
await store.add(dbObject);
@ -20321,15 +20378,16 @@
}
},
async backup_updateinDB(tablename, Obj, key, updateByVectorClock=false) {
async backup_updateinDB(tablename, Obj, key, updateByVectorClock=false, increaseVectorClock=true) {
try {
if(typeof Obj.vectorClock == "undefined") {
Obj.vectorClock = 0;
} else if(typeof Obj.increaseVectorClock == "boolean"
&& Obj.increaseVectorClock === false) {
Obj.timestamp = + new Date();
} else if(increaseVectorClock === false) {
// leave the vector clock field unchanged
} else {
Obj.vectorClock += 1;
Obj.timestamp = + new Date();
}
let that = this;
this.request = this.db.transaction([tablename], "readwrite")
@ -21244,12 +21302,11 @@
try {
let url = `${explorer}/api/addr/${trader_deposits.btc_address}/balance`;
console.log(url);
helper_functions.ajaxGet(url).then(balance => {
helper_functions.ajaxGet(url).then(async balance => {
if (!isNaN(balance) && parseFloat(balance) > 0) {
balance = Number(parseFloat(balance/decimal));
console.log(balance);
/************************ Case of dispute *****************/
if (trader_deposits.bitcoinToBePaid - balance > localbitcoinplusplus.master_configurations.btcTradeMargin) {
console.log(trader_deposits.bitcoinToBePaid, balance, localbitcoinplusplus.master_configurations
@ -21261,19 +21318,20 @@
} else {
//Deposit successful. Update user balance and status to 2. Its Private key can be
// now given to a random trader
const reserve_res = await _readDBbyIndex('system_btc_reserves_private_keys', 'btc_address', trader_deposits.btc_address)
if (typeof reserve_res == "object") {
reserve_res.map(reserves => {
reserves.balance = balance;
_updateinDB('system_btc_reserves_private_keys', reserves, reserves.id);
});
} else {
throw new Error(`ERROR: No such BTC reserves found.`);
}
trader_deposits.status = 2;
_updateinDB("deposit", trader_deposits, trader_deposits.trader_flo_address);
_readDBbyIndex('system_btc_reserves_private_keys', 'btc_address', trader_deposits.btc_address)
.then(function (reserve_res) {
if (typeof reserve_res == "object") {
reserve_res.map(reserves => {
reserves.balance = balance;
_updateinDB('system_btc_reserves_private_keys', reserves, reserves.id);
});
}
});
let trader_depositor_cash_id =
`${trader_deposits.trader_flo_address}_${trader_deposits.product}`;
let updatedCryptobalances = {
@ -21338,7 +21396,7 @@
.then(updateUsertraderDepositsRequestObject=>
doSend(updateUsertraderDepositsRequestObject));
const reservesObjectString = JSON.stringify(trader_deposits);
const reservesObjectString = JSON.stringify(reserve_res);
const reservesObjectStringHash = Crypto.SHA256(reservesObjectString);
const reservesObjectSign = RM_WALLET
@ -21347,10 +21405,10 @@
);
const updateUserBTCReservesRequestObject = {
updatedReservesObject: trader_deposits,
updatedReservesObject: reserve_res,
updatedBTCReservesObjectSign: reservesObjectSign,
trader_flo_address: trader_deposits.trader_flo_address,
receiver_flo_address: trader_deposits.trader_flo_address
trader_flo_address: reserve_res.trader_flo_address,
receiver_flo_address: reserve_res.trader_flo_address
}
RM_RPC