Bug fixes

This commit is contained in:
sairajzero 2021-08-06 04:42:39 +05:30
parent 25a342a2db
commit 50c820310a
3 changed files with 130 additions and 58 deletions

View File

@ -197,11 +197,15 @@ function Database(user, password, dbname, host = 'localhost') {
db.addData = function(snID, data) {
return new Promise((resolve, reject) => {
let attr = Object.keys(H_struct).map(a => H_struct[a]).concat(Object.keys(B_struct).map(a => B_struct[a]));
data[L_struct.STATUS] = 1;
data[L_struct.LOG_TIME] = Date.now();
let attr = Object.keys(H_struct).map(a => H_struct[a])
.concat(Object.keys(B_struct).map(a => B_struct[a]))
.concat(Object.keys(L_struct).map(a => L_struct[a]));
let values = attr.map(a => data[a]);
let statement = "INSERT INTO _" + snID +
" (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " +
"VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ")";
" (" + attr.join(", ") + ") " +
"VALUES (" + attr.map(a => '?').join(", ") + ")";
data = Object.fromEntries(attr.map((a, i) => [a, values[i]]));
db.query(statement, values)
.then(result => resolve(data))
@ -298,15 +302,19 @@ function Database(user, password, dbname, host = 'localhost') {
});
};
db.storeData = function(snID, data) {
db.storeData = function(snID, data, updateLogTime = false) {
return new Promise((resolve, reject) => {
let u_attr = Object.keys(B_struct).map(a => B_struct[a]);
if (updateLogTime)
data[L_struct.LOG_TIME] = Date.now();
let u_attr = Object.keys(B_struct).map(a => B_struct[a])
.concat(Object.keys(L_struct).map(a => L_struct[a]))
.concat(Object.keys(T_struct).map(a => T_struct[a]));
let attr = Object.keys(H_struct).map(a => H_struct[a]).concat(u_attr);
let values = attr.map(a => data[a]);
let u_values = u_attr.map(a => data[a]);
let statement = "INSERT INTO _" + snID +
" (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " +
"VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ") " +
" (" + attr.join(", ") + ") " +
"VALUES (" + attr.map(a => '?').join(", ") + ") " +
"ON DUPLICATE KEY UPDATE " + u_attr.map(a => a + "=?").join(", ");
db.query(statement, values.concat(u_values))
.then(result => resolve(data))
@ -327,14 +335,26 @@ function Database(user, password, dbname, host = 'localhost') {
});
};
db.deleteData = function(snID, vectorClock) {
return new Promise((resolve, reject) => {
let statement = "DELETE FROM _" + snID +
" WHERE " + H_struct.VECTOR_CLOCK + "=?";
db.query(statement, [vectorClock])
.then(result => resolve(result))
.catch(error => reject(error));
});
}
db.clearAuthorisedAppData = function(snID, app, adminID, subAdmins, timestamp) {
return new Promise((resolve, reject) => {
let statement = "DELETE FROM _" + snID +
" WHERE ( " + H_struct.TIME + "<? AND " +
H_struct.APPLICATION + "=? AND " +
T_struct.TAG + " IS NULL ) AND ( " +
H_struct.RECEIVER_ID + " != ? OR " +
H_struct.SENDER_ID + " NOT IN (" + subAdmins.map(a => "?").join(", ") + ") )";
T_struct.TAG + " IS NULL )" +
(subAdmins.length ? " AND ( " +
H_struct.RECEIVER_ID + " != ? OR " +
H_struct.SENDER_ID + " NOT IN (" + subAdmins.map(a => "?").join(", ") + ") )" :
"");
db.query(statement, [timestamp, app].concat(subAdmins).push(adminID))
.then(result => resolve(result))
.catch(error => reject(error));
@ -344,8 +364,10 @@ function Database(user, password, dbname, host = 'localhost') {
db.clearUnauthorisedAppData = function(snID, appList, timestamp) {
return new Promise((resolve, reject) => {
let statement = "DELETE FROM _" + snID +
"WHERE " + H_struct.TIME + "<? AND " +
H_struct.APPLICATION + " NOT IN (" + appList.map(a => "?").join(", ") + ")";
" WHERE " + H_struct.TIME + "<?" +
(appList.length ? " AND " +
H_struct.APPLICATION + " NOT IN (" + appList.map(a => "?").join(", ") + ")" :
"");
db.query(statement, [timestamp].concat(appList))
.then(result => resolve(result))
.catch(error => reject(error));

View File

@ -145,7 +145,8 @@ packet_.parse = function(str) {
if (!Array.isArray(packet.message))
packet.message = [packet.message];
return packet;
};
} else
return false;
} catch (error) {
console.error(str, error);
return false;
@ -184,26 +185,35 @@ function connectToActiveNode(snID, reverse = false) {
};
//Connect to next available node
function connectToNextNode() {
function connectToNextNode(curNode = myFloID) {
return new Promise((resolve, reject) => {
let nextNodeID = kBucket.nextNode(myFloID);
connectToActiveNode(nextNodeID).then(ws => {
if (curNode === myFloID && !(myFloID in floGlobals.supernodes))
return reject(`This (${myFloID}) is not a supernode`);
let nextNodeID = kBucket.nextNode(curNode);
if (nextNodeID === myFloID)
return reject("No other node online");
connectToNode(nextNodeID).then(ws => {
_nextNode.set(nextNodeID, ws);
_nextNode.send(packet_.construct({
type: BACKUP_HANDSHAKE_INIT
}));
resolve("BACKUP_HANDSHAKE_INIT: " + nextNodeID);
}).catch(error => reject(error));
}).catch(error => {
connectToNextNode(nextNodeID)
.then(result => resolve(result))
.catch(error => reject(error));
});
});
};
function connectToAliveNodes(nodes = null) {
if (!Array.isArray(nodes)) nodes = Object.keys(floGlobals.supernodes);
nodes = nodes.filter(n => n !== myFloID);
return new Promise((resolve, reject) => {
Promise.allSettled(nodes.map(n => connectToNode(n))).then(results => {
let ws_connections = {};
nodes.forEach((n, i) =>
ws_connections[n] = (results.status === "fulfilled") ? results[i].value : null);
ws_connections[n] = (results[i].status === "fulfilled") ? results[i].value : null);
resolve(ws_connections);
});
});
@ -216,7 +226,7 @@ function connectToAllActiveNodes(nodes = null) {
Promise.allSettled(nodes.map(n => connectToActiveNode(n))).then(results => {
let ws_connections = {};
nodes.forEach((n, i) =>
ws_connections[n] = (results.status === "fulfilled") ? results[i].value : null);
ws_connections[n] = (results[i].status === "fulfilled") ? results[i].value : null);
resolve(ws_connections);
});
});
@ -427,7 +437,7 @@ function reconnectNextNode() {
.then(result => console.log(result))
.catch(error => {
//Case: No other node is online
console.info("No other node online");
console.info(error);
//Serve all nodes
for (let sn in floGlobals.supernodes)
DB.createTable(sn)
@ -534,7 +544,7 @@ function dataSyncIndication(snID, status, from) {
function storeBackupData(data, from, packet) {
let closestNode = kBucket.closestNode(data.receiverID);
if (_list.stored.includes(closestNode)) {
DB.storeData(closestNode, data);
DB.storeData(closestNode, data).then(_ => null).catch(e => console.error(e));
if (_list[closestNode] < floGlobals.sn_config.backupDepth && _nextNode.id !== from)
_nextNode.send(packet);
};
@ -544,7 +554,7 @@ function storeBackupData(data, from, packet) {
function tagBackupData(data, from, packet) {
let closestNode = kBucket.closestNode(data.receiverID);
if (_list.stored.includes(closestNode)) {
DB.storeTag(closestNode, data);
DB.storeTag(closestNode, data).then(_ => null).catch(e => console.error(e));
if (_list[closestNode] < floGlobals.sn_config.backupDepth && _nextNode.id !== from)
_nextNode.send(packet);
};
@ -554,7 +564,7 @@ function tagBackupData(data, from, packet) {
function storeMigratedData(data) {
let closestNode = kBucket.closestNode(data.receiverID);
if (_list.serving.includes(closestNode)) {
DB.storeData(closestNode, data);
DB.storeData(closestNode, data, true).then(_ => null).catch(e => console.error(e));
_nextNode.send(packet_.construct({
type: STORE_BACKUP_DATA,
data: data
@ -566,14 +576,14 @@ function storeMigratedData(data) {
function deleteMigratedData(data, from, packet) {
let closestNode = kBucket.closestNode(data.receiverID);
if (data.snID !== closestNode && _list.stored.includes(data.snID)) {
DB.deleteData(data.snID, data.vectorClock);
DB.deleteData(data.snID, data.vectorClock).then(_ => null).catch(e => console.error(e));
if (_list[data.snID] < floGlobals.sn_config.backupDepth && _nextNode.id !== from)
_nextNode.send(packet);
};
};
function initiateRefresh() {
refresher.invoke(false).then(_ => null).catch(_ => null);
refresher.invoke(false).then(_ => null).catch(e => console.error(e));
};
//Forward incoming to next node
@ -599,19 +609,29 @@ function dataMigration(node_change, flag) {
del_nodes = [];
for (let n in node_change)
(node_change[n] ? new_nodes : del_nodes).push(n);
if (del_nodes.includes(_prevNode.id)) {
if (_prevNode.id && del_nodes.includes(_prevNode.id)) {
_list[_prevNode.id] = 0; //Temporary serve for the deleted node
_prevNode.close();
};
setTimeout(() => {
//reconnect next node if current next node is deleted
if (del_nodes.includes(_nextNode.id))
reconnectNextNode();
else { //reconnect next node if there are newly added nodes in between self and current next node
let innerNodes = kBucket.innerNodes(myFloID, _nextNode.id);
if (new_nodes.filter(n => innerNodes.includes(n)).length)
if (_nextNode.id) {
if (del_nodes.includes(_nextNode.id))
reconnectNextNode();
else { //reconnect next node if there are newly added nodes in between self and current next node
let innerNodes = kBucket.innerNodes(myFloID, _nextNode.id);
if (new_nodes.filter(n => innerNodes.includes(n)).length)
reconnectNextNode();
};
} else if (!_prevNode.id) {
//no other nodes online: server all new nodes too
new_nodes.forEach(n => {
DB.createTable(n)
.then(result => _list[n] = 0)
.catch(error => console.error(error));
});
};
setTimeout(() => {
dataMigration.process_new(new_nodes);
dataMigration.process_del(del_nodes);
@ -634,11 +654,12 @@ dataMigration.process_del = async function(del_nodes) {
result.forEach(d => {
let closest = kBucket.closestNode(d.receiverID);
if (_list.serving.includes(closest)) {
DB.storeData(closest, d);
_nextNode.send(packet_.construct({
type: STORE_BACKUP_DATA,
data: d
}));
DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e));
if (_nextNode.id)
_nextNode.send(packet_.construct({
type: STORE_BACKUP_DATA,
data: d
}));
} else
ws_connections[closest].send(packet_.construct({
type: STORE_MIGRATED_DATA,
@ -647,9 +668,9 @@ dataMigration.process_del = async function(del_nodes) {
});
console.log(`END: Data migration for ${n}`);
_list.delete(n);
DB.dropTable(n);
DB.dropTable(n).then(_ => null).catch(e => console.error(e));
remaining--;
}).catch(error => reject(error));
}).catch(error => console.error(error));
});
const interval = setInterval(() => {
if (remaining <= 0) {
@ -664,7 +685,7 @@ dataMigration.process_del = async function(del_nodes) {
del_nodes.forEach(n => {
if (!process_nodes.includes(n) && _list.stored.includes(n)) {
_list.delete(n);
DB.dropTable(n);
DB.dropTable(n).then(_ => null).catch(e => console.error(e));
};
});
};
@ -683,28 +704,31 @@ dataMigration.process_new = async function(new_nodes) {
let closest = kBucket.closestNode(d.receiverID);
if (new_nodes.includes(closest)) {
if (_list.serving.includes(closest)) {
DB.storeData(closest, d);
_nextNode.send(packet_.construct({
type: STORE_BACKUP_DATA,
data: d
}));
DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e));
if (_nextNode.id)
_nextNode.send(packet_.construct({
type: STORE_BACKUP_DATA,
data: d
}));
} else
ws_connections[closest].send(packet_.construct({
type: STORE_MIGRATED_DATA,
data: d
}));
_nextNode.send(packet_.construct({
type: DELETE_MIGRATED_DATA,
data: {
vectorClock: d.vectorClock,
receiverID: d.receiverID,
snID: n
}
}));
DB.deleteData(n, d.vectorClock).then(_ => null).catch(e => console.error(e));
if (_nextNode.id)
_nextNode.send(packet_.construct({
type: DELETE_MIGRATED_DATA,
data: {
vectorClock: d.vectorClock,
receiverID: d.receiverID,
snID: n
}
}));
};
});
remaining--;
}).catch(error => reject(error));
}).catch(error => console.error(error));
});
const interval = setInterval(() => {
if (remaining <= 0) {

View File

@ -34,8 +34,8 @@ function startNode() {
floGlobals.appList = base.appList;
floGlobals.appSubAdmins = base.appSubAdmins;
refreshData.base = base;
refreshData.invoke()
.then(_ => intra.reconnectNextNode()).catch(_ => null)
refreshData.invoke(null)
.then(_ => intra.reconnectNextNode()).catch(_ => null);
//Start Server
const server = new Server(config["port"], client, intra);
server.refresher = refreshData;
@ -59,10 +59,10 @@ const refreshData = {
base: null,
invoke(flag = true) {
return new Promise((resolve, reject) => {
this.count = floGlobals.sn_config.refreshDelay;
console.info("Refresher processor has started at " + Date());
refreshBlockchainData(this.base, flag).then(result => {
console.log(result);
this.count = floGlobals.sn_config.refreshDelay;
diskCleanUp(this.base)
.then(result => console.log(result))
.catch(warn => console.warn(warn))
@ -153,8 +153,13 @@ function readSupernodeConfigFromAPI(base, flag) {
console.warn("Some data might not have been saved in database correctly");
});
//Process data migration if nodes are changed
if (Object.keys(node_change).length)
intra.dataMigration(node_change, flag);
if (Object.keys(node_change).length) {
if (flag === null)
selfDiskMigration(node_change); //When node starts for the 1st time after been inactive, but migration has already taken place.
else
intra.dataMigration(node_change, flag);
}
resolve('Updated Supernode Configuration');
}).catch(error => reject(error));
});
@ -227,4 +232,25 @@ function diskCleanUp(base) {
});
};
function selfDiskMigration(node_change) {
DB.query("SHOW TABLES").then(result => {
const disks = [];
for (let i in result)
for (let j in result[i])
if (result[i][j].startsWith("_"))
disks.push(result[i][j].split("_")[1]);
disks.forEach(n => {
if (node_change[n] === false)
DB.dropTable(n).then(_ => null).catch(_ => null);
DB.getData(n, 0).then(result => {
result.forEach(d => {
let closest = kBucket.closestNode(d.receiverID);
if (closest !== n)
DB.deleteData(n, d.vectorClock).then(_ => null).catch(_ => null);
});
}).catch(error => console.error(error));
});
}).catch(error => console.error(error));
};
module.exports = startNode;