diff --git a/args/schema.sql b/args/schema.sql index 74bb07d..e397821 100644 --- a/args/schema.sql +++ b/args/schema.sql @@ -34,9 +34,11 @@ CREATE TABLE TrustedList( /* User Data */ CREATE TABLE Users ( + id INT NOT NULL AUTO_INCREMENT, floID CHAR(34) NOT NULL, pubKey CHAR(66) NOT NULL, created DATETIME DEFAULT CURRENT_TIMESTAMP, + KEY(id), PRIMARY KEY(floID) ); @@ -82,11 +84,13 @@ CREATE TABLE UserTag ( /* User Requests */ -CREATE TABLE Request_Log( +CREATE TABLE RequestLog( + id INT NOT NULL AUTO_INCREMENT, floID CHAR(34) NOT NULL, request TEXT NOT NULL, sign TEXT NOT NULL, request_time DATETIME DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(id), FOREIGN KEY (floID) REFERENCES Users(floID) ); @@ -159,25 +163,30 @@ CREATE TABLE OutputToken ( /* Transaction Data */ CREATE TABLE PriceHistory ( + id INT NOT NULL AUTO_INCREMENT, asset VARCHAR(64) NOT NULL, rate FLOAT NOT NULL, rec_time DATETIME DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(id), FOREIGN KEY (asset) REFERENCES AssetList(asset) ); CREATE TABLE TransactionHistory ( + id INT NOT NULL AUTO_INCREMENT, seller CHAR(34) NOT NULL, buyer CHAR(34) NOT NULL, asset VARCHAR(64) NOT NULL, quantity FLOAT NOT NULL, unitValue DECIMAL(10, 2) NOT NULL, tx_time DATETIME DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(id), FOREIGN KEY (buyer) REFERENCES Users(floID), FOREIGN KEY (seller) REFERENCES Users(floID), FOREIGN KEY (asset) REFERENCES AssetList(asset) ); CREATE TABLE AuditTransaction( + id INT NOT NULL AUTO_INCREMENT, rec_time DATETIME DEFAULT CURRENT_TIMESTAMP, unit_price FLOAT NOT NULL, quantity FLOAT NOT NULL, @@ -193,6 +202,7 @@ CREATE TABLE AuditTransaction( buyer_new_asset FLOAT NOT NULL, buyer_old_cash FLOAT NOT NULL, buyer_new_cash FLOAT NOT NULL, + PRIMARY KEY(id), FOREIGN KEY (sellerID) REFERENCES Users(floID), FOREIGN KEY (buyerID) REFERENCES Users(floID), FOREIGN KEY (asset) REFERENCES AssetList(asset) @@ -208,6 +218,14 @@ CREATE TABLE _backup ( PRIMARY KEY(t_name, id) ); +CREATE table _backupCache( + id INT AUTO_INCREMENT, + t_name TINYTEXT, + data_cache LONGTEXT, + status BOOLEAN, + PRIMARY KEY(id) +); + CREATE TABLE sinkShares( floID CHAR(34) NOT NULL, share TEXT, @@ -215,6 +233,20 @@ CREATE TABLE sinkShares( PRIMARY KEY(floID) ); +CREATE TRIGGER Users_I AFTER INSERT ON Users +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('Users', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT; +CREATE TRIGGER Users_U AFTER UPDATE ON Users +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('Users', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT; +CREATE TRIGGER Users_D AFTER DELETE ON Users +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('Users', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT; + +CREATE TRIGGER RequestLog_I AFTER INSERT ON RequestLog +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RequestLog', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT; +CREATE TRIGGER RequestLog_U AFTER UPDATE ON RequestLog +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RequestLog', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT; +CREATE TRIGGER RequestLog_D AFTER DELETE ON RequestLog +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RequestLog', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT; + CREATE TRIGGER UserSession_I AFTER INSERT ON UserSession FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('UserSession', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT; CREATE TRIGGER UserSession_U AFTER UPDATE ON UserSession @@ -283,4 +315,25 @@ FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('UserTag', NEW.id) ON DUPL CREATE TRIGGER UserTag_U AFTER UPDATE ON UserTag FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('UserTag', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT; CREATE TRIGGER UserTag_D AFTER DELETE ON UserTag -FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('UserTag', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT; \ No newline at end of file +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('UserTag', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT; + +CREATE TRIGGER PriceHistory_I AFTER INSERT ON PriceHistory +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('PriceHistory', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT; +CREATE TRIGGER PriceHistory_U AFTER UPDATE ON PriceHistory +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('PriceHistory', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT; +CREATE TRIGGER PriceHistory_D AFTER DELETE ON PriceHistory +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('PriceHistory', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT; + +CREATE TRIGGER AuditTransaction_I AFTER INSERT ON AuditTransaction +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('AuditTransaction', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT; +CREATE TRIGGER AuditTransaction_U AFTER UPDATE ON AuditTransaction +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('AuditTransaction', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT; +CREATE TRIGGER AuditTransaction_D AFTER DELETE ON AuditTransaction +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('AuditTransaction', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT; + +CREATE TRIGGER TransactionHistory_I AFTER INSERT ON TransactionHistory +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('TransactionHistory', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT; +CREATE TRIGGER TransactionHistory_U AFTER UPDATE ON TransactionHistory +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('TransactionHistory', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT; +CREATE TRIGGER TransactionHistory_D AFTER DELETE ON TransactionHistory +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('TransactionHistory', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT; diff --git a/args/truncateAll.sql b/args/truncateAll.sql index 1d3dfed..fea8f49 100644 --- a/args/truncateAll.sql +++ b/args/truncateAll.sql @@ -8,17 +8,17 @@ TRUNCATE InputToken; TRUNCATE OutputFLO; TRUNCATE OutputToken; TRUNCATE PriceHistory; -TRUNCATE Request_Log; +TRUNCATE RequestLog; TRUNCATE SellOrder; TRUNCATE UserSession; TRUNCATE UserTag; TRUNCATE TransactionHistory; TRUNCATE Vault; -TRUNCATE Users; +DELETE FROM Users; /* Blockchain data */ TRUNCATE LastTx; TRUNCATE NodeList; TRUNCATE TrustedList; -TRUNCATE TagList; -TRUNCATE AssetList; \ No newline at end of file +DELETE FROM TagList; +DELETE FROM AssetList; \ No newline at end of file diff --git a/src/backup/head.js b/src/backup/head.js index d924c25..c32ae96 100644 --- a/src/backup/head.js +++ b/src/backup/head.js @@ -21,8 +21,7 @@ function sendBackup(timestamp, ws) { timestamp = timestamp.substring(0, timestamp.length - 1); let promises = [ send_dataSync(timestamp, ws), - send_deleteSync(timestamp, ws), - send_dataImmutable(timestamp, ws) + send_deleteSync(timestamp, ws) ]; Promise.allSettled(promises).then(result => { let failedSync = []; @@ -65,7 +64,7 @@ function send_dataSync(timestamp, ws) { .then(data => { ws.send(JSON.stringify({ table, - command: "SYNC_ADD_UPDATE", + command: "SYNC_UPDATE", data })); res(table); @@ -79,7 +78,7 @@ function send_dataSync(timestamp, ws) { let sync_needed = {}; result.forEach(r => r.t_name in sync_needed ? sync_needed[r.t_name].push(r.id) : sync_needed[r.t_name] = [r.id]); ws.send(JSON.stringify({ - command: "SYNC_ADD_UPDATE_HEADER", + command: "SYNC_HEADER", add_data: result })); let promises = []; @@ -100,43 +99,6 @@ function send_dataSync(timestamp, ws) { }); } -function send_dataImmutable(timestamp, ws) { - const immutable_tables = { - Users: "created", - Request_Log: "request_time", - TransactionHistory: "tx_time", - PriceHistory: "rec_time" - }; - const sendTable = (table, timeCol) => new Promise((res, rej) => { - DB.query(`SELECT * FROM ${table} WHERE ${timeCol} > ?`, [timestamp]) - .then(data => { - ws.send(JSON.stringify({ - table, - command: "SYNC_ADD_IMMUTABLE", - data - })); - res(table); - }).catch(error => { - console.error(error); - rej(table); - }); - }); - - return new Promise((resolve, reject) => { - let promises = []; - for (let table in immutable_tables) - promises.push(sendTable(table, immutable_tables[table])); - Promise.allSettled(promises).then(result => { - let failedTables = []; - result.forEach(r => r.status === "rejected" ? failedTables.push(r.reason) : null); - if (failedTables.length) - reject(["dataImmutable", failedTables]); - else - resolve("dataImmutable"); - }); - }) -} - //Shares function generateNewSink() { let sink = floCrypto.generateNewID(); diff --git a/src/backup/slave.js b/src/backup/slave.js index 51d9818..4ab0ffa 100644 --- a/src/backup/slave.js +++ b/src/backup/slave.js @@ -43,17 +43,7 @@ function stopSlaveProcess() { function requestBackupSync(ws) { return new Promise((resolve, reject) => { - const tables = { - Users: "created", - Request_Log: "request_time", - TransactionHistory: "tx_time", - //PriceHistory: "rec_time", - _backup: "timestamp" - }; - let subs = []; - for (let t in tables) - subs.push(`SELECT MAX(${tables[t]}) as ts FROM ${t}`); - DB.query(`SELECT MAX(ts) as last_time FROM (${subs.join(' UNION ')}) AS Z`).then(result => { + DB.query('SELECT MAX(timestamp) as last_time FROM _backup').then(result => { let request = { floID: global.myFloID, pubKey: myPubKey, @@ -70,9 +60,9 @@ function requestBackupSync(ws) { const requestInstance = { ws: null, - delete_sync: null, - add_sync: null, - immutable_sync: null, + cache: null, + delete_count: null, + add_count: null, delete_data: null, add_data: null, total_add: null, @@ -101,6 +91,8 @@ requestInstance.open = function(ws = null) { requestBackupSync(ws).then(request => { self.request = request; + self.cache = []; + self.add_count = self.delete_count = 0; self.ws = ws; }).catch(error => console.error(error)) } @@ -111,9 +103,9 @@ requestInstance.close = function() { self.ws.close(); self.onetime = null; self.ws = null; - self.delete_sync = null; - self.add_sync = null; - self.immutable_sync = null; + self.cache = null; + self.delete_count = null; + self.add_count = null; self.delete_data = null; self.add_data = null; self.total_add = null; @@ -124,7 +116,7 @@ requestInstance.close = function() { function processDataFromMaster(message) { try { message = JSON.parse(message); - console.debug(message); + console.debug("Master:", message); if (message.command.startsWith("SYNC")) processBackupData(message); else switch (message.command) { @@ -174,35 +166,87 @@ function processBackupData(response) { break; case "SYNC_END": if (response.status) { - if (self.total_add !== self.add_sync) - console.info(`Backup Sync Instance finished!, ${self.total_add - self.add_sync} packets not received.`); + if (self.total_add !== self.add_count) + console.info(`Backup Sync Instance finished!, ${self.total_add - self.add_count} packets not received.`); else console.info("Backup Sync Instance finished successfully"); - updateBackupTable(self.add_data, self.delete_data) + storeBackupData(self.cache, self.add_data, self.delete_data); } else console.info("Backup Sync was not successful! Failed info: ", response.info); self.close(); break; case "SYNC_DELETE": self.delete_data = response.delete_data; - self.delete_sync += 1; - deleteData(response.delete_data); + self.delete_count += 1; + self.cache.push(cacheBackupData(null, response.delete_data)); break; - case "SYNC_ADD_UPDATE_HEADER": + case "SYNC_HEADER": self.add_data = response.add_data; self.total_add = Object.keys(response.add_data).length; break; - case "SYNC_ADD_UPDATE": - self.add_sync += 1; - addUpdateData(response.table, response.data); - break; - case "SYNC_ADD_IMMUTABLE": - self.immutable_sync += 1; - addImmutableData(response.table, response.data); + case "SYNC_UPDATE": + self.add_count += 1; + self.cache.push(cacheBackupData(response.table, response.data)); break; } } +const cacheBackupData = (tableName, dataCache) => new Promise((resolve, reject) => { + DB.query("INSERT INTO _backupCache (t_name, data_cache) VALUE (?, ?)", [tableName, JSON.stringify(dataCache)]) + .then(_ => resolve(true)).catch(error => { + console.error(error); + reject(false); + }) +}); + +function storeBackupData(cache_promises, add_header, delete_header) { + Promise.allSettled(cache_promises).then(_ => { + console.log("START: BackupCache -> Tables"); + //Process 'Users' table 1st as it provides foreign key attribute to other tables + DB.query("SELECT * FROM _backupCache WHERE t_name=?", ["Users"]).then(data => { + Promise.allSettled(data.map(d => updateTableData("Users", JSON.parse(d.data_cache)))).then(result => { + storeBackupData.commit(data, result).then(_ => { + DB.query("SELECT * FROM _backupCache WHERE t_name IS NOT NULL").then(data => { + Promise.allSettled(data.map(d => updateTableData(d.t_name, JSON.parse(d.data_cache)))).then(result => { + storeBackupData.commit(data, result).then(_ => { + DB.query("SELECT * FROM _backupCache WHERE t_name IS NULL").then(data => { + Promise.allSettled(data.map(d => deleteTableData(JSON.parse(d.data_cache)))).then(result => { + storeBackupData.commit(data, result).then(_ => { + console.log("END: BackupCache -> Tables"); + updateBackupTable(add_header, delete_header); + }); + }) + }) + }) + }) + }).catch(error => { + console.error(error); + console.warn("ABORT: BackupCache -> Tables") + }); + }) + }) + }).catch(error => { + console.error(error); + console.warn("ABORT: BackupCache -> Tables") + }) + }).catch(error => reject(error)) +} + +storeBackupData.commit = function(data, result) { + let promises = []; + for (let i = 0; i < data.length; i++) + switch (result[i].status) { + case "fulfilled": + promises.push(DB.query("DELETE FROM _backupCache WHERE id=?", data[i].id)); + break; + case "rejected": + console.error(result[i].reason); + promises.push(DB.query("UPDATE _backupCache SET status=FALSE WHERE id=?", data[i].id)); + break; + } + return Promise.allSettled(promises); +} + function updateBackupTable(add_data, delete_data) { //update _backup table for added data DB.transaction(add_data.map(r => [ @@ -210,47 +254,34 @@ function updateBackupTable(add_data, delete_data) { [r.t_name, r.id, validateValue(r.timestamp), validateValue(r.timestamp)] ])).then(_ => null).catch(error => console.error(error)); //update _backup table for deleted data - let del_queries = []; - delete_data.forEach(r => del_queries.push([])); DB.transaction(delete_data.map(r => [ "INSERT INTO _backup (t_name, id, mode, timestamp) VALUES (?, ?, NULL, ?) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=?", [r.t_name, r.id, validateValue(r.timestamp), validateValue(r.timestamp)] ])).then(_ => null).catch(error => console.error(error)); } -function deleteData(data) { - let delete_needed = {}; - data.forEach(r => r.t_name in delete_needed ? delete_needed[r.t_name].push(r.id) : delete_needed[r.t_name] = [r.id]); - let queries = []; - for (let table in delete_needed) - queries.push(`DELETE FROM ${table} WHERE id IN (${delete_needed[table]})`); - DB.transaction(queries).then(_ => null).catch(error => console.error(error)); +function deleteTableData(data) { + return new Promise((resolve, reject) => { + let delete_needed = {}; + data.forEach(r => r.t_name in delete_needed ? delete_needed[r.t_name].push(r.id) : delete_needed[r.t_name] = [r.id]); + let queries = []; + for (let table in delete_needed) + queries.push(`DELETE FROM ${table} WHERE id IN (${delete_needed[table]})`); + DB.transaction(queries).then(_ => resolve(true)).catch(error => reject(error)); + }) } -function addUpdateData(table, data) { - if (!data.length) - return; - let cols = Object.keys(data[0]), - _mark = "(" + Array(cols.length).fill('?') + ")"; - let values = data.map(r => cols.map(c => validateValue(r[c]))).flat(); - let statement = `INSERT INTO ${table} (${cols}) VALUES ${Array(data.length).fill(_mark)} AS new` + - " ON DUPLICATE KEY UPDATE " + cols.filter(c => c != 'id').map(c => c + " = new." + c); - DB.query(statement, values).then(_ => null).catch(error => console.error(error)); -} - -function addImmutableData(table, data) { - if (!data.length) - return; - const primaryKeys = { - Users: "floID" - }; - let cols = Object.keys(data[0]), - _mark = "(" + Array(cols.length).fill('?') + ")"; - let values = data.map(r => cols.map(c => validateValue(r[c]))).flat(); - let statement = `INSERT INTO ${table} (${cols}) VALUES ${Array(data.length).fill(_mark)}`; - if (table in primaryKeys) - statement += ` ON DUPLICATE KEY UPDATE ${primaryKeys[table]}=${primaryKeys[table]}`; - DB.query(statement, values).then(_ => null).catch(error => console.error(error)); +function updateTableData(table, data) { + return new Promise((resolve, reject) => { + if (!data.length) + return resolve(null); + let cols = Object.keys(data[0]), + _mark = "(" + Array(cols.length).fill('?') + ")"; + let values = data.map(r => cols.map(c => validateValue(r[c]))).flat(); + let statement = `INSERT INTO ${table} (${cols}) VALUES ${Array(data.length).fill(_mark)} AS new` + + " ON DUPLICATE KEY UPDATE " + cols.map(c => c + " = new." + c); + DB.query(statement, values).then(_ => resolve(true)).catch(error => reject(error)); + }) } const validateValue = val => (typeof val === "string" && /\.\d{3}Z$/.test(val)) ? val.substring(0, val.length - 1) : val; diff --git a/src/request.js b/src/request.js index dc04956..08371df 100644 --- a/src/request.js +++ b/src/request.js @@ -57,7 +57,7 @@ function validateRequest(request, sign, pubKey) { function storeRequest(floID, req_str, sign) { console.debug(floID, req_str); - DB.query("INSERT INTO Request_Log (floID, request, sign) VALUES (?,?,?)", [floID, req_str, sign]) + DB.query("INSERT INTO RequestLog (floID, request, sign) VALUES (?,?,?)", [floID, req_str, sign]) .then(_ => null).catch(error => console.error(error)); }