Backup Sync improvements

- Removed: Immutable table-type (Previously immutable tables are now considered as mutable tables in backup feature)
- Backup sync-data from master are temporally cached and then processed after receiving all sync-data
- Backup sync-data from cache are now processed in ordered manner (to prevent foreign key constrain error, repeated data, re-adding a deleted data)
- Updated SQL Schema
This commit is contained in:
sairajzero 2022-02-08 06:26:03 +05:30
parent 3fb401a226
commit a06c5b4c7b
5 changed files with 158 additions and 112 deletions

View File

@ -34,9 +34,11 @@ CREATE TABLE TrustedList(
/* User Data */
CREATE TABLE Users (
id INT NOT NULL AUTO_INCREMENT,
floID CHAR(34) NOT NULL,
pubKey CHAR(66) NOT NULL,
created DATETIME DEFAULT CURRENT_TIMESTAMP,
KEY(id),
PRIMARY KEY(floID)
);
@ -82,11 +84,13 @@ CREATE TABLE UserTag (
/* User Requests */
CREATE TABLE Request_Log(
CREATE TABLE RequestLog(
id INT NOT NULL AUTO_INCREMENT,
floID CHAR(34) NOT NULL,
request TEXT NOT NULL,
sign TEXT NOT NULL,
request_time DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(id),
FOREIGN KEY (floID) REFERENCES Users(floID)
);
@ -159,25 +163,30 @@ CREATE TABLE OutputToken (
/* Transaction Data */
CREATE TABLE PriceHistory (
id INT NOT NULL AUTO_INCREMENT,
asset VARCHAR(64) NOT NULL,
rate FLOAT NOT NULL,
rec_time DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(id),
FOREIGN KEY (asset) REFERENCES AssetList(asset)
);
CREATE TABLE TransactionHistory (
id INT NOT NULL AUTO_INCREMENT,
seller CHAR(34) NOT NULL,
buyer CHAR(34) NOT NULL,
asset VARCHAR(64) NOT NULL,
quantity FLOAT NOT NULL,
unitValue DECIMAL(10, 2) NOT NULL,
tx_time DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(id),
FOREIGN KEY (buyer) REFERENCES Users(floID),
FOREIGN KEY (seller) REFERENCES Users(floID),
FOREIGN KEY (asset) REFERENCES AssetList(asset)
);
CREATE TABLE AuditTransaction(
id INT NOT NULL AUTO_INCREMENT,
rec_time DATETIME DEFAULT CURRENT_TIMESTAMP,
unit_price FLOAT NOT NULL,
quantity FLOAT NOT NULL,
@ -193,6 +202,7 @@ CREATE TABLE AuditTransaction(
buyer_new_asset FLOAT NOT NULL,
buyer_old_cash FLOAT NOT NULL,
buyer_new_cash FLOAT NOT NULL,
PRIMARY KEY(id),
FOREIGN KEY (sellerID) REFERENCES Users(floID),
FOREIGN KEY (buyerID) REFERENCES Users(floID),
FOREIGN KEY (asset) REFERENCES AssetList(asset)
@ -208,6 +218,14 @@ CREATE TABLE _backup (
PRIMARY KEY(t_name, id)
);
CREATE table _backupCache(
id INT AUTO_INCREMENT,
t_name TINYTEXT,
data_cache LONGTEXT,
status BOOLEAN,
PRIMARY KEY(id)
);
CREATE TABLE sinkShares(
floID CHAR(34) NOT NULL,
share TEXT,
@ -215,6 +233,20 @@ CREATE TABLE sinkShares(
PRIMARY KEY(floID)
);
CREATE TRIGGER Users_I AFTER INSERT ON Users
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('Users', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT;
CREATE TRIGGER Users_U AFTER UPDATE ON Users
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('Users', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT;
CREATE TRIGGER Users_D AFTER DELETE ON Users
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('Users', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT;
CREATE TRIGGER RequestLog_I AFTER INSERT ON RequestLog
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RequestLog', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT;
CREATE TRIGGER RequestLog_U AFTER UPDATE ON RequestLog
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RequestLog', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT;
CREATE TRIGGER RequestLog_D AFTER DELETE ON RequestLog
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RequestLog', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT;
CREATE TRIGGER UserSession_I AFTER INSERT ON UserSession
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('UserSession', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT;
CREATE TRIGGER UserSession_U AFTER UPDATE ON UserSession
@ -283,4 +315,25 @@ FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('UserTag', NEW.id) ON DUPL
CREATE TRIGGER UserTag_U AFTER UPDATE ON UserTag
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('UserTag', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT;
CREATE TRIGGER UserTag_D AFTER DELETE ON UserTag
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('UserTag', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT;
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('UserTag', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT;
CREATE TRIGGER PriceHistory_I AFTER INSERT ON PriceHistory
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('PriceHistory', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT;
CREATE TRIGGER PriceHistory_U AFTER UPDATE ON PriceHistory
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('PriceHistory', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT;
CREATE TRIGGER PriceHistory_D AFTER DELETE ON PriceHistory
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('PriceHistory', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT;
CREATE TRIGGER AuditTransaction_I AFTER INSERT ON AuditTransaction
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('AuditTransaction', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT;
CREATE TRIGGER AuditTransaction_U AFTER UPDATE ON AuditTransaction
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('AuditTransaction', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT;
CREATE TRIGGER AuditTransaction_D AFTER DELETE ON AuditTransaction
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('AuditTransaction', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT;
CREATE TRIGGER TransactionHistory_I AFTER INSERT ON TransactionHistory
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('TransactionHistory', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT;
CREATE TRIGGER TransactionHistory_U AFTER UPDATE ON TransactionHistory
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('TransactionHistory', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT;
CREATE TRIGGER TransactionHistory_D AFTER DELETE ON TransactionHistory
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('TransactionHistory', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT;

View File

@ -8,17 +8,17 @@ TRUNCATE InputToken;
TRUNCATE OutputFLO;
TRUNCATE OutputToken;
TRUNCATE PriceHistory;
TRUNCATE Request_Log;
TRUNCATE RequestLog;
TRUNCATE SellOrder;
TRUNCATE UserSession;
TRUNCATE UserTag;
TRUNCATE TransactionHistory;
TRUNCATE Vault;
TRUNCATE Users;
DELETE FROM Users;
/* Blockchain data */
TRUNCATE LastTx;
TRUNCATE NodeList;
TRUNCATE TrustedList;
TRUNCATE TagList;
TRUNCATE AssetList;
DELETE FROM TagList;
DELETE FROM AssetList;

View File

@ -21,8 +21,7 @@ function sendBackup(timestamp, ws) {
timestamp = timestamp.substring(0, timestamp.length - 1);
let promises = [
send_dataSync(timestamp, ws),
send_deleteSync(timestamp, ws),
send_dataImmutable(timestamp, ws)
send_deleteSync(timestamp, ws)
];
Promise.allSettled(promises).then(result => {
let failedSync = [];
@ -65,7 +64,7 @@ function send_dataSync(timestamp, ws) {
.then(data => {
ws.send(JSON.stringify({
table,
command: "SYNC_ADD_UPDATE",
command: "SYNC_UPDATE",
data
}));
res(table);
@ -79,7 +78,7 @@ function send_dataSync(timestamp, ws) {
let sync_needed = {};
result.forEach(r => r.t_name in sync_needed ? sync_needed[r.t_name].push(r.id) : sync_needed[r.t_name] = [r.id]);
ws.send(JSON.stringify({
command: "SYNC_ADD_UPDATE_HEADER",
command: "SYNC_HEADER",
add_data: result
}));
let promises = [];
@ -100,43 +99,6 @@ function send_dataSync(timestamp, ws) {
});
}
function send_dataImmutable(timestamp, ws) {
const immutable_tables = {
Users: "created",
Request_Log: "request_time",
TransactionHistory: "tx_time",
PriceHistory: "rec_time"
};
const sendTable = (table, timeCol) => new Promise((res, rej) => {
DB.query(`SELECT * FROM ${table} WHERE ${timeCol} > ?`, [timestamp])
.then(data => {
ws.send(JSON.stringify({
table,
command: "SYNC_ADD_IMMUTABLE",
data
}));
res(table);
}).catch(error => {
console.error(error);
rej(table);
});
});
return new Promise((resolve, reject) => {
let promises = [];
for (let table in immutable_tables)
promises.push(sendTable(table, immutable_tables[table]));
Promise.allSettled(promises).then(result => {
let failedTables = [];
result.forEach(r => r.status === "rejected" ? failedTables.push(r.reason) : null);
if (failedTables.length)
reject(["dataImmutable", failedTables]);
else
resolve("dataImmutable");
});
})
}
//Shares
function generateNewSink() {
let sink = floCrypto.generateNewID();

View File

@ -43,17 +43,7 @@ function stopSlaveProcess() {
function requestBackupSync(ws) {
return new Promise((resolve, reject) => {
const tables = {
Users: "created",
Request_Log: "request_time",
TransactionHistory: "tx_time",
//PriceHistory: "rec_time",
_backup: "timestamp"
};
let subs = [];
for (let t in tables)
subs.push(`SELECT MAX(${tables[t]}) as ts FROM ${t}`);
DB.query(`SELECT MAX(ts) as last_time FROM (${subs.join(' UNION ')}) AS Z`).then(result => {
DB.query('SELECT MAX(timestamp) as last_time FROM _backup').then(result => {
let request = {
floID: global.myFloID,
pubKey: myPubKey,
@ -70,9 +60,9 @@ function requestBackupSync(ws) {
const requestInstance = {
ws: null,
delete_sync: null,
add_sync: null,
immutable_sync: null,
cache: null,
delete_count: null,
add_count: null,
delete_data: null,
add_data: null,
total_add: null,
@ -101,6 +91,8 @@ requestInstance.open = function(ws = null) {
requestBackupSync(ws).then(request => {
self.request = request;
self.cache = [];
self.add_count = self.delete_count = 0;
self.ws = ws;
}).catch(error => console.error(error))
}
@ -111,9 +103,9 @@ requestInstance.close = function() {
self.ws.close();
self.onetime = null;
self.ws = null;
self.delete_sync = null;
self.add_sync = null;
self.immutable_sync = null;
self.cache = null;
self.delete_count = null;
self.add_count = null;
self.delete_data = null;
self.add_data = null;
self.total_add = null;
@ -124,7 +116,7 @@ requestInstance.close = function() {
function processDataFromMaster(message) {
try {
message = JSON.parse(message);
console.debug(message);
console.debug("Master:", message);
if (message.command.startsWith("SYNC"))
processBackupData(message);
else switch (message.command) {
@ -174,35 +166,87 @@ function processBackupData(response) {
break;
case "SYNC_END":
if (response.status) {
if (self.total_add !== self.add_sync)
console.info(`Backup Sync Instance finished!, ${self.total_add - self.add_sync} packets not received.`);
if (self.total_add !== self.add_count)
console.info(`Backup Sync Instance finished!, ${self.total_add - self.add_count} packets not received.`);
else
console.info("Backup Sync Instance finished successfully");
updateBackupTable(self.add_data, self.delete_data)
storeBackupData(self.cache, self.add_data, self.delete_data);
} else
console.info("Backup Sync was not successful! Failed info: ", response.info);
self.close();
break;
case "SYNC_DELETE":
self.delete_data = response.delete_data;
self.delete_sync += 1;
deleteData(response.delete_data);
self.delete_count += 1;
self.cache.push(cacheBackupData(null, response.delete_data));
break;
case "SYNC_ADD_UPDATE_HEADER":
case "SYNC_HEADER":
self.add_data = response.add_data;
self.total_add = Object.keys(response.add_data).length;
break;
case "SYNC_ADD_UPDATE":
self.add_sync += 1;
addUpdateData(response.table, response.data);
break;
case "SYNC_ADD_IMMUTABLE":
self.immutable_sync += 1;
addImmutableData(response.table, response.data);
case "SYNC_UPDATE":
self.add_count += 1;
self.cache.push(cacheBackupData(response.table, response.data));
break;
}
}
const cacheBackupData = (tableName, dataCache) => new Promise((resolve, reject) => {
DB.query("INSERT INTO _backupCache (t_name, data_cache) VALUE (?, ?)", [tableName, JSON.stringify(dataCache)])
.then(_ => resolve(true)).catch(error => {
console.error(error);
reject(false);
})
});
function storeBackupData(cache_promises, add_header, delete_header) {
Promise.allSettled(cache_promises).then(_ => {
console.log("START: BackupCache -> Tables");
//Process 'Users' table 1st as it provides foreign key attribute to other tables
DB.query("SELECT * FROM _backupCache WHERE t_name=?", ["Users"]).then(data => {
Promise.allSettled(data.map(d => updateTableData("Users", JSON.parse(d.data_cache)))).then(result => {
storeBackupData.commit(data, result).then(_ => {
DB.query("SELECT * FROM _backupCache WHERE t_name IS NOT NULL").then(data => {
Promise.allSettled(data.map(d => updateTableData(d.t_name, JSON.parse(d.data_cache)))).then(result => {
storeBackupData.commit(data, result).then(_ => {
DB.query("SELECT * FROM _backupCache WHERE t_name IS NULL").then(data => {
Promise.allSettled(data.map(d => deleteTableData(JSON.parse(d.data_cache)))).then(result => {
storeBackupData.commit(data, result).then(_ => {
console.log("END: BackupCache -> Tables");
updateBackupTable(add_header, delete_header);
});
})
})
})
})
}).catch(error => {
console.error(error);
console.warn("ABORT: BackupCache -> Tables")
});
})
})
}).catch(error => {
console.error(error);
console.warn("ABORT: BackupCache -> Tables")
})
}).catch(error => reject(error))
}
storeBackupData.commit = function(data, result) {
let promises = [];
for (let i = 0; i < data.length; i++)
switch (result[i].status) {
case "fulfilled":
promises.push(DB.query("DELETE FROM _backupCache WHERE id=?", data[i].id));
break;
case "rejected":
console.error(result[i].reason);
promises.push(DB.query("UPDATE _backupCache SET status=FALSE WHERE id=?", data[i].id));
break;
}
return Promise.allSettled(promises);
}
function updateBackupTable(add_data, delete_data) {
//update _backup table for added data
DB.transaction(add_data.map(r => [
@ -210,47 +254,34 @@ function updateBackupTable(add_data, delete_data) {
[r.t_name, r.id, validateValue(r.timestamp), validateValue(r.timestamp)]
])).then(_ => null).catch(error => console.error(error));
//update _backup table for deleted data
let del_queries = [];
delete_data.forEach(r => del_queries.push([]));
DB.transaction(delete_data.map(r => [
"INSERT INTO _backup (t_name, id, mode, timestamp) VALUES (?, ?, NULL, ?) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=?",
[r.t_name, r.id, validateValue(r.timestamp), validateValue(r.timestamp)]
])).then(_ => null).catch(error => console.error(error));
}
function deleteData(data) {
let delete_needed = {};
data.forEach(r => r.t_name in delete_needed ? delete_needed[r.t_name].push(r.id) : delete_needed[r.t_name] = [r.id]);
let queries = [];
for (let table in delete_needed)
queries.push(`DELETE FROM ${table} WHERE id IN (${delete_needed[table]})`);
DB.transaction(queries).then(_ => null).catch(error => console.error(error));
function deleteTableData(data) {
return new Promise((resolve, reject) => {
let delete_needed = {};
data.forEach(r => r.t_name in delete_needed ? delete_needed[r.t_name].push(r.id) : delete_needed[r.t_name] = [r.id]);
let queries = [];
for (let table in delete_needed)
queries.push(`DELETE FROM ${table} WHERE id IN (${delete_needed[table]})`);
DB.transaction(queries).then(_ => resolve(true)).catch(error => reject(error));
})
}
function addUpdateData(table, data) {
if (!data.length)
return;
let cols = Object.keys(data[0]),
_mark = "(" + Array(cols.length).fill('?') + ")";
let values = data.map(r => cols.map(c => validateValue(r[c]))).flat();
let statement = `INSERT INTO ${table} (${cols}) VALUES ${Array(data.length).fill(_mark)} AS new` +
" ON DUPLICATE KEY UPDATE " + cols.filter(c => c != 'id').map(c => c + " = new." + c);
DB.query(statement, values).then(_ => null).catch(error => console.error(error));
}
function addImmutableData(table, data) {
if (!data.length)
return;
const primaryKeys = {
Users: "floID"
};
let cols = Object.keys(data[0]),
_mark = "(" + Array(cols.length).fill('?') + ")";
let values = data.map(r => cols.map(c => validateValue(r[c]))).flat();
let statement = `INSERT INTO ${table} (${cols}) VALUES ${Array(data.length).fill(_mark)}`;
if (table in primaryKeys)
statement += ` ON DUPLICATE KEY UPDATE ${primaryKeys[table]}=${primaryKeys[table]}`;
DB.query(statement, values).then(_ => null).catch(error => console.error(error));
function updateTableData(table, data) {
return new Promise((resolve, reject) => {
if (!data.length)
return resolve(null);
let cols = Object.keys(data[0]),
_mark = "(" + Array(cols.length).fill('?') + ")";
let values = data.map(r => cols.map(c => validateValue(r[c]))).flat();
let statement = `INSERT INTO ${table} (${cols}) VALUES ${Array(data.length).fill(_mark)} AS new` +
" ON DUPLICATE KEY UPDATE " + cols.map(c => c + " = new." + c);
DB.query(statement, values).then(_ => resolve(true)).catch(error => reject(error));
})
}
const validateValue = val => (typeof val === "string" && /\.\d{3}Z$/.test(val)) ? val.substring(0, val.length - 1) : val;

View File

@ -57,7 +57,7 @@ function validateRequest(request, sign, pubKey) {
function storeRequest(floID, req_str, sign) {
console.debug(floID, req_str);
DB.query("INSERT INTO Request_Log (floID, request, sign) VALUES (?,?,?)", [floID, req_str, sign])
DB.query("INSERT INTO RequestLog (floID, request, sign) VALUES (?,?,?)", [floID, req_str, sign])
.then(_ => null).catch(error => console.error(error));
}