update supernode modules

- Auto Disk cleanup
- Data migration (node addition and node deletion)
- Some minor bugs and typo fixes
This commit is contained in:
sairajzero 2021-07-22 01:50:20 +05:30
parent 854e88dde3
commit bb21b1c436
4 changed files with 322 additions and 50 deletions

View File

@ -96,7 +96,8 @@ function processTagFromUser(data) {
.map(d => data[d]).join("|");
if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey))
return reject("Invalid signature");
DB.tagData(closeNode, data.vectorClock, data.tag, data.time, data.pubKey, data.sign)
let tag = [null, undefined, ""].includes(data.tag) ? null : [data.tag].toString();
DB.tagData(closeNode, data.vectorClock, tag, data.time, data.pubKey, data.sign)
.then(result => resolve([result, 'TAG']))
.catch(error => reject(error))
})

View File

@ -31,14 +31,14 @@ const H_struct = {
SENDER_ID: "senderID",
RECEIVER_ID: "receiverID",
TYPE: "type",
APPLICATION: "application"
APPLICATION: "application",
TIME: "time",
PUB_KEY: "pubKey"
}
const B_struct = {
TIME: "time",
MESSAGE: "message",
SIGNATURE: "sign",
PUB_KEY: "pubKey",
COMMENT: "comment"
}
@ -155,7 +155,7 @@ function Database(user, password, dbname, host = 'localhost') {
db.createTable = function(snID) {
return new Promise((resolve, reject) => {
let statement = "CREATE TABLE IF NOT EXISTS " + snID + " ( " +
let statement = "CREATE TABLE IF NOT EXISTS _" + snID + " ( " +
H_struct.VECTOR_CLOCK + " VARCHAR(50) NOT NULL, " +
H_struct.SENDER_ID + " CHAR(34) NOT NULL, " +
H_struct.RECEIVER_ID + " CHAR(34) NOT NULL, " +
@ -182,7 +182,7 @@ function Database(user, password, dbname, host = 'localhost') {
db.dropTable = function(snID) {
return new Promise((resolve, reject) => {
let statement = "DROP TABLE " + snID;
let statement = "DROP TABLE _" + snID;
db.query(statement)
.then(result => resolve(result))
.catch(error => reject(error))
@ -193,7 +193,7 @@ function Database(user, password, dbname, host = 'localhost') {
return new Promise((resolve, reject) => {
let attr = Object.keys(H_struct).map(a => H_struct[a]);
let values = attr.map(a => data[a]);
let statement = "INSERT INTO " + snID +
let statement = "INSERT INTO _" + snID +
" (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " +
"VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ")";
data = Object.fromEntries(attr.map((a, i) => [
@ -217,7 +217,7 @@ function Database(user, password, dbname, host = 'localhost') {
let attr = Object.keys(data);
let values = attr.map(a => data[a]).concat(vectorClock);
data[H_struct.VECTOR_CLOCK] = vectorClock; //also add vectorClock to resolve data
let statement = "UPDATE " + snID +
let statement = "UPDATE _" + snID +
" SET " + attr.map(a => a + "=?").join(", ") +
" WHERE " + H_struct.VECTOR_CLOCK + "=?";
db.query(statement, values)
@ -254,7 +254,7 @@ function Database(user, password, dbname, host = 'localhost') {
//console.log(conditionArr);
let attr = Object.keys(H_struct).concat(Object.keys(B_struct))
let statement = "SELECT (" + attr.join(", ") + ")" +
" FROM " + snID +
" FROM _" + snID +
" WHERE " + conditionArr.join(" AND ") +
request.mostRecent ? "LIMIT 1" : (" ORDER BY " + H_struct.VECTOR_CLOCK);
db.query(statement)
@ -265,7 +265,7 @@ function Database(user, password, dbname, host = 'localhost') {
db.lastLogTime = function(snID) {
return new Promise((resolve, reject) => {
let statement = "SELECT MAX(" + L_struct.LOG_TIME + ") FROM " + snID;
let statement = "SELECT MAX(" + L_struct.LOG_TIME + ") FROM _" + snID;
db.query(statement)
.then(result => resolve(result))
.catch(error => reject(error))
@ -284,7 +284,7 @@ function Database(user, password, dbname, host = 'localhost') {
db.getData = function(snID, logtime) {
return new Promise((resolve, reject) => {
let statement = "SELECT * FROM " + snID +
let statement = "SELECT * FROM _" + snID +
" WHERE " + L_struct.LOG_TIME + ">=" + logtime +
" ORDER BY " + L_struct.LOG_TIME;
db.query(statement)
@ -299,7 +299,7 @@ function Database(user, password, dbname, host = 'localhost') {
let attr = Object.keys(H_struct).map(a => H_struct[a]).concat(u_attr);
let values = attr.map(a => data[a]);
let u_values = u_attr.map(a => data[a]);
let statement = "INSERT INTO " + snID +
let statement = "INSERT INTO _" + snID +
" (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " +
"VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ") " +
"ON DUPLICATE KEY UPDATE " + u_attr.map(a => a + "=?").join(", ");
@ -313,7 +313,7 @@ function Database(user, password, dbname, host = 'localhost') {
return new Promise((resolve, reject) => {
let attr = Object.keys(T_struct).map(a => T_struct[a]).concat(L_struct.LOG_TIME);
let values = attr.map(a => data[a]);
let statement = "UPDATE " + snID +
let statement = "UPDATE _" + snID +
" SET " + attr.map(a => a + "=?").join(", ") +
" WHERE " + H_struct.VECTOR_CLOCK + "=" + data[H_struct.VECTOR_CLOCK];
db.query(statement, values)
@ -322,6 +322,31 @@ function Database(user, password, dbname, host = 'localhost') {
})
}
db.clearAuthorisedAppData = function(snID, app, adminID, subAdmins, timestamp) {
return new Promise((resolve, reject) => {
let statement = "DELETE FROM _" + snID +
" WHERE ( " + H_struct.TIME + "<? AND " +
H_struct.APPLICATION + "=? AND " +
T_struct.TAG + " IS NULL ) AND ( " +
H_struct.RECEIVER_ID + " != ? OR " +
H_struct.SENDER_ID + " NOT IN (" + subAdmins.map(a => "?").join(", ") + ") )";
db.query(statement, [timestamp, app].concat(subAdmins).push(adminID))
.then(result => resolve(result))
.catch(error => reject(error))
})
}
db.clearUnauthorisedAppData = function(snID, appList, timestamp) {
return new Promise((resolve, reject) => {
let statement = "DELETE FROM _" + snID +
"WHERE " + H_struct.TIME + "<? AND " +
H_struct.APPLICATION + " NOT IN (" + appList.map(a => "?").join(", ") + ")";
db.query(statement, [timestamp].concat(appList))
.then(result => resolve(result))
.catch(error => reject(error))
})
}
db.close = function() {
db.conn.end();
}

View File

@ -6,7 +6,8 @@ const SUPERNODE_INDICATOR = '$',
//Message type
ORDER_BACKUP = "orderBackup",
STORE_BACKUP_DATA = "backupData",
//STORE_MIGRATED_DATA = "migratedData",
STORE_MIGRATED_DATA = "migratedData",
DELETE_MIGRATED_DATA = "migratedDelete",
//DELETE_BACKUP_DATA = "backupDelete",
TAG_BACKUP_DATA = "backupTag",
//EDIT_BACKUP_DATA = "backupEdit",
@ -14,9 +15,11 @@ const SUPERNODE_INDICATOR = '$',
DATA_REQUEST = "dataRequest",
DATA_SYNC = "dataSync",
BACKUP_HANDSHAKE_INIT = "handshakeInitate",
BACKUP_HANDSHAKE_END = "handshakeEnd";
BACKUP_HANDSHAKE_END = "handshakeEnd",
RETRY_TIMEOUT = 5 * 60 * 1000, //5 mins
MIGRATE_WAIT_DELAY = 5 * 60 * 1000; //5 mins
var DB //container for database
var DB, refresher //container for database and refresher
//List of node backups stored
const _list = {};
@ -135,6 +138,7 @@ packet_.parse = function(str) {
let packet = JSON.parse(str.substring(SUPERNODE_INDICATOR.length))
let curTime = Date.now();
if (packet.time > curTime - floGlobals.sn_config.delayDelta &&
packet.from in floGlobals.supernodes &&
floCrypto.verifySign(this.s(packet), packet.sign, floGlobals.supernodes[packet.from].pubKey)) {
if (!Array.isArray(packet.message))
packet.message = [packet.message];
@ -187,6 +191,31 @@ function connectToNextNode() {
})
}
function connectToAliveNodes(nodes = null) {
if (!Array.isArray(nodes)) nodes = Object.keys(floGlobals.supernodes);
return new Promise((resolve, reject) => {
Promise.allSettled(nodes.map(n => connectToNode(n))).then(results => {
let ws_connections = {};
nodes.forEach((n, i) =>
ws_connections[n] = (results.status === "fulfilled") ? results[i].value : null);
resolve(ws_connections);
}).catch(error => reject(error))
})
}
//Connect to all given nodes [Array] (Default: All super-nodes)
function connectToAllActiveNodes(nodes = null) {
if (!Array.isArray(nodes)) nodes = Object.keys(floGlobals.supernodes);
return new Promise((resolve, reject) => {
Promise.allSettled(nodes.map(n => connectToActiveNode(n))).then(results => {
let ws_connections = {};
nodes.forEach((n, i) =>
ws_connections[n] = (results.status === "fulfilled") ? results[i].value : null);
resolve(ws_connections);
}).catch(error => reject(error))
})
}
//-----PROCESS TASKS-----
//Tasks from next-node
@ -213,6 +242,8 @@ function processTaskFromNextNode(packet) {
case STORE_BACKUP_DATA:
storeBackupData(task.data)
break;
default:
console.log("Invalid task type:" + task.type + "from next-node")
}
})
}
@ -242,9 +273,11 @@ function processTaskFromPrevNode(packet) {
case DATA_SYNC:
dataSyncIndication(task.id, task.status, from)
break;
case INITIATE_REFRESH: //TODO
initiateRefresh()
case DELETE_MIGRATED_DATA:
deleteMigratedData(task.data, from, packet)
break;
default:
console.log("Invalid task type:" + task.type + "from prev-node")
}
});
}
@ -262,11 +295,14 @@ function processTaskFromSupernode(packet, ws) {
case BACKUP_HANDSHAKE_INIT:
handshakeMid(from, ws)
break;
/*
case STORE_MIGRATED_DATA: //TODO
case STORE_MIGRATED_DATA:
storeMigratedData(task.data)
break;
*/
case INITIATE_REFRESH:
initiateRefresh()
break;
default:
console.log("Invalid task type:" + task.type + "from super-node")
}
});
}
@ -276,7 +312,7 @@ function processTaskFromSupernode(packet, ws) {
//Acknowledge handshake
function handshakeMid(id, ws) {
if (_prevNode.id) {
if (_prevNode.id && _prevNode.id in floGlobals.supernodes) {
if (kBucket.innerNodes(_prevNode.id, myFloID).includes(id)) {
//close existing prev-node connection
_prevNode.send(packet_.constuct({
@ -319,24 +355,48 @@ function handshakeMid(id, ws) {
});
if (!req_sync.length && !new_order.length)
return; //No order change and no need for any data sync
Promise.all(req_sync.forEach(n => DB.createGetLastLog(n))).then(result => {
let tasks = [];
if (req_sync.length) {
else
handshakeMid.requestData(req_sync, new_order);
}
handshakeMid.requestData = function(req_sync, new_order) {
if (handshakeMid.timeout) {
clearTimeout(handshakeMid.timeout)
delete handshakeMid.timeout;
}
Promise.allSettled(req_sync.map(n => DB.createGetLastLog(n))).then(result => {
let tasks = [],
lastlogs = {},
failed = [],
order = [],
failed_order = [];
req_sync.forEach((s, i) => {
if (result[i].status === "fulfilled")
lastlogs[s] = result[i].value;
else
failed.push(s)
});
if (Object.keys(lastlogs).length)
tasks.push({
type: DATA_REQUEST,
nodes: Object.fromEntries(req_sync.map((k, i) => [k, result[i]]))
})
}
if (new_order.length) {
nodes: lastlogs
});
new_order.forEach(n => {
if (failed.includes(n))
failed_order.push(n)
else
order.push(n)
});
if (order.length)
tasks.push({
type: ORDER_BACKUP,
order: _list.get(new_order)
order: _list.get(order)
})
}
_nextNode.send(packet_.constuct(tasks));
}).catch(error => {
FATAL.RECONNECTION_REQUIRED //TODO
})
if (failed.length)
handshakeMid.timeout = setTimeout(_ => handshakeMid.requestData(failed, failed_order), RETRY_TIMEOUT)
});
}
//Complete handshake
@ -444,6 +504,33 @@ function tagBackupData(data, from, packet) {
}
}
//Store (migrated) data
function storeMigratedData(data) {
let closestNode = kBucket.closestNode(data.receiverID);
if (_list.serving.includes(closestNode)) {
DB.storeData(closestNode, data);
_nextNode.send(packet_.constuct({
type: STORE_BACKUP_DATA,
data: data
}));
}
}
//Delete (migrated) data
function deleteMigratedData(old_sn, vectorClock, receiverID, from, packet) {
let closestNode = kBucket.closestNode(receiverID);
if (old_sn !== closestNode && _list.stored.includes(old_sn)) {
DB.deleteData(old_sn, vectorClock);
if (_list[old_sn] < floGlobals.sn_config.backupDepth &&
_nextNode.id !== from)
_nextNode.send(packet);
}
}
function initiateRefresh() {
refresher.invoke(false)
}
//Forward incoming to next node
function forwardToNextNode(mode, data) {
var modeMap = {
@ -457,9 +544,143 @@ function forwardToNextNode(mode, data) {
}));
}
function dataMigration(node_change){
console.log("data migration")
//TODO
//Data migration processor
function dataMigration(node_change, flag) {
if (!Object.keys(node_change))
return;
console.log("Node list changed! Data migration required");
if (flag) dataMigration.intimateAllNodes(); //Initmate All nodes to call refresher
let new_nodes = [],
del_nodes = [];
for (let n in node_change)
(node_change[n] ? new_nodes : del_nodes).push(n);
if (del_nodes.includes(_prevNode.id)) {
_list[_prevNode.id] = 0; //Temporary serve for the deleted node
_prevNode.close();
}
setTimeout(() => {
//reconnect next node if current next node is deleted
if (del_nodes.includes(_nextNode.id))
reconnectNextNode();
else { //reconnect next node if there are newly added nodes in between self and current next node
let innerNodes = kBucket.innerNodes(myFloID, _nextNode.id)
if (new_nodes.filter(n => innerNodes.includes(n)).length)
reconnectNextNode();
}
setTimeout(() => {
dataMigration.process_new(new_nodes);
dataMigration.process_del(del_nodes);
}, MIGRATE_WAIT_DELAY);
}, MIGRATE_WAIT_DELAY)
}
//data migration sub-process: Deleted nodes
dataMigration.process_del = async function(del_nodes) {
if (!del_nodes.length)
return;
let process_nodes = del_nodes.filter(n => _list.serving.includes(n))
if (process_nodes.length) {
connectToAllActiveNodes().then(ws_connections => {
let remaining = process_nodes.length;
process_nodes.forEach(n => {
DB.getData(n, 0).then(result => {
console.info(`START: Data migration for ${n}`);
//TODO: efficiently handle large number of data instead of loading all into memory
result.forEach(d => {
let closest = kBucket.closestNode(d.receiverID);
if (_list.serving.includes(closest)) {
DB.storeData(closest, d);
_nextNode.send(packet_.constuct({
type: STORE_BACKUP_DATA,
data: d
}));
} else
ws_connections[closest].send(packet_.constuct({
type: STORE_MIGRATED_DATA,
data: d
}));
})
console.info(`END: Data migration for ${n}`);
_list.delete(n);
DB.dropTable(n);
remaining--;
}).catch(error => reject(error))
});
const interval = setInterval(() => {
if (remaining <= 0) {
for (let c in ws_connections)
if (ws_connections[c])
ws_connections[c].close()
clearInterval(interval);
}
}, RETRY_TIMEOUT);
}).catch(error => reject(error))
}
del_nodes.forEach(n => {
if (!process_nodes.includes(n) && _list.stored.includes(n)) {
_list.delete(n);
DB.dropTable(n);
}
})
}
//data migration sub-process: Added nodes
dataMigration.process_new = async function(new_nodes) {
if (!new_nodes.length)
return;
connectToAllActiveNodes(new_nodes).then(ws_connections => {
let process_nodes = _list.serving,
remaining = process_nodes.length;
process_nodes.forEach(n => {
DB.getData(n, 0).then(result => {
//TODO: efficiently handle large number of data instead of loading all into memory
result.forEach(d => {
let closest = kBucket.closestNode(d.receiverID);
if (new_nodes.includes(closest)) {
if (_list.serving.includes(closest)) {
DB.storeData(closest, d);
_nextNode.send(packet_.constuct({
type: STORE_BACKUP_DATA,
data: d
}));
} else
ws_connections[closest].send(packet_.constuct({
type: STORE_MIGRATED_DATA,
data: d
}));
_nextNode.send(packet_.constuct({
type: DELETE_MIGRATED_DATA,
vectorClock: d.vectorClock,
receiverID: d.receiverID,
snID: n
}));
}
})
remaining--;
}).catch(error => reject(error))
});
const interval = setInterval(() => {
if (remaining <= 0) {
for (let c in ws_connections)
if (ws_connections[c])
ws_connections[c].close()
clearInterval(interval);
}
}, RETRY_TIMEOUT);
}).catch(error => reject(error))
}
dataMigration.intimateAllNodes = function() {
connectToAliveNodes().then(ws_connections => {
let packet = packet_.constuct({
type: INITIATE_REFRESH
})
for (let n in ws_connections)
if (ws_connections[n]) {
ws_connections[n].send(packet)
ws_connections[n].close()
}
}).catch(error => reject(error))
}
//-----EXPORTS-----
@ -470,7 +691,10 @@ module.exports = {
dataMigration,
SUPERNODE_INDICATOR,
_list,
set DB(db){
set DB(db) {
DB = db
},
set refresher(r) {
refresher = r
}
}

View File

@ -34,6 +34,7 @@ function startNode() {
//Start Server
const server = new Server(config["port"], client, intra);
server.refresher = refreshData;
intra.refresher = refreshData;
}).catch(error => reject(error))
}).catch(error => reject(error))
}
@ -51,13 +52,13 @@ function loadBase(DB) {
const refreshData = {
count: null,
base: null,
invoke() {
invoke(flag = true) {
this.count = floGlobals.sn_config.refreshDelay;
refreshBlockchainData(this.base).then(result => {
refreshBlockchainData(this.base, flag).then(result => {
console.log(result)
diskCleanUp()
diskCleanUp(this.base)
.then(result => console.info(result))
.catch(error => console.error(error))
.catch(warn => console.warn(warn))
}).catch(error => console.error(error))
},
get countdown() {
@ -67,22 +68,22 @@ const refreshData = {
}
}
function refreshBlockchainData(base) {
function refreshBlockchainData(base, flag) {
return new Promise((resolve, reject) => {
readSupernodeConfigFromAPI(base).then(result => {
readSupernodeConfigFromAPI(base, flag).then(result => {
console.log(result)
kBucket.launch().then(result => {
console.log(result)
readAppSubAdminListFromAPI(base)
.then(result => console.log(result))
.catch(error => console.warn(error))
.catch(warn => console.warn(warn))
.finally(_ => resolve("Refreshed Data from blockchain"))
}).catch(error => reject(error))
}).catch(error => reject(error))
})
}
function readSupernodeConfigFromAPI(base) {
function readSupernodeConfigFromAPI(base, flag) {
return new Promise((resolve, reject) => {
floBlockchainAPI.readData(floGlobals.SNStorageID, {
ignoreOld: base.lastTx[floGlobals.SNStorageID],
@ -140,7 +141,7 @@ function readSupernodeConfigFromAPI(base) {
});
//Process data migration if nodes are changed
if (Object.keys(node_change))
intra.dataMigration(node_change)
intra.dataMigration(node_change, flag)
resolve('Updated Supernode Configuration');
}).catch(error => reject(error))
})
@ -177,7 +178,7 @@ function readAppSubAdminListFromAPI(base) {
}));
}
return new Promise((resolve, reject) => {
Promise.allSettled(results => {
Promise.allSettled(promises).then(results => {
if (results.reduce((a, r) => r.status === "rejected" ? ++a : a, 0)) {
let error = Object.fromEntries(results.filter(r => r.status === "rejected").map(r => r.reason));
console.error(JSON.stringify(error));
@ -188,8 +189,29 @@ function readAppSubAdminListFromAPI(base) {
})
}
function diskCleanUp(){
//TODO: Clear all unauthorised data from before deleteDelay
function diskCleanUp(base) {
return new Promise((resolve, reject) => {
let time = Date.now() - base.sn_config.deleteDelay,
promises = [];
intra._list.serving.forEach(sn => {
//delete all when app is not authorised.
promises.push(DB.clearUnauthorisedAppData(sn, Object.keys(base.appList), time));
//for each authorised app: delete unofficial data (untaged, unknown sender/receiver)
for (let app in base.appList)
promises.push(DB.clearAuthorisedAppData(sn, app, base.appList[app], base.subAdmins[app], time));
})
Promise.allSettled(promises).then(results => {
let failed = results.filter(r => r.status === "rejected").map(r => r.reason)
if (failed.length) {
console.error(JSON.stringify(failed));
let success = results.length - failed.length;
reject(`Disk clean-up process has failed at ${100 * success/results.length}%. (Success:${success}|Failed:${failed.count})`)
} else
resolve("Disk clean-up process finished successfully (100%)");
}).catch(error => reject(error))
})
}
module.exports = startNode;