Bug fixes

- Changed some console.log to console.debug (Also added few more).
- Fixed: DB.lastLogTime not returning correct value
- Fixed: MySQL disconnection bug. Now using Pool connect for persistent connection.
- Fixed: connectNode not connecting properly due to a typo.
- Fixed: RECONNECT_NEXT_NODE const value not added
- Fixed: reconnectNextNode not triggered when only one node was online and a node comes online.
- Fixed: orderBackup not storing the order properly when less or just sufficient nodes available.
- Added: orderBackup to requesting data when sync is required
- Fixed: typo in sendStoredData
- Added: logInterval of _prevNode, _nextNode and _list
This commit is contained in:
sairajzero 2021-07-26 05:31:54 +05:30
parent 87a578e37b
commit 6b69469477
4 changed files with 99 additions and 34 deletions

View File

@ -28,10 +28,10 @@ function processIncomingData(data) {
else
return reject("Invalid Data-format");
process.then(result => {
console.log(result);
console.debug(result);
resolve(result);
}).catch(error => {
console.error(error);
console.debug(error);
reject(error);
});
};

View File

@ -56,11 +56,6 @@ const T_struct = {
function Database(user, password, dbname, host = 'localhost') {
const db = {};
db.query = (s, v) => new Promise((res, rej) => {
//console.log("\nSQL:",s, v);
const fn = ((e, r) => e ? rej(e) : res(r));
v ? db.conn.query(s, v, fn) : db.conn.query(s, fn);
});
db.createBase = function() {
return new Promise((resolve, reject) => {
@ -274,9 +269,10 @@ function Database(user, password, dbname, host = 'localhost') {
db.lastLogTime = function(snID) {
return new Promise((resolve, reject) => {
let statement = "SELECT MAX(" + L_struct.LOG_TIME + ") FROM _" + snID;
let attr = "MAX(" + L_struct.LOG_TIME + ")";
let statement = "SELECT " + attr + " FROM _" + snID;
db.query(statement)
.then(result => resolve(result))
.then(result => resolve(result[0][attr] || 0))
.catch(error => reject(error));
});
};
@ -356,22 +352,43 @@ function Database(user, password, dbname, host = 'localhost') {
});
};
db.close = function() {
db.conn.end();
};
Object.defineProperty(db, "connect", {
get: () => new Promise((resolve, reject) => {
db.pool.getConnection((error, conn) => {
if (error)
reject(error);
else
resolve(conn);
});
})
});
Object.defineProperty(db, "query", {
value: (sql, values) => new Promise((resolve, reject) => {
db.connect.then(conn => {
const fn = (err, res) => {
conn.release();
(err ? reject(err) : resolve(res));
};
if (values)
conn.query(sql, values, fn);
else
conn.query(sql, fn);
}).catch(error => reject(error));
})
});
return new Promise((resolve, reject) => {
let conn = mysql.createConnection({
db.pool = mysql.createPool({
host: host,
user: user,
password: password,
database: dbname
});
conn.connect((err) => {
if (err) return reject(err);
db.conn = conn;
db.connect.then(conn => {
conn.release();
resolve(db);
});
}).catch(error => reject(error));
});
};

View File

@ -16,6 +16,7 @@ const SUPERNODE_INDICATOR = '$',
DATA_SYNC = "dataSync",
BACKUP_HANDSHAKE_INIT = "handshakeInitate",
BACKUP_HANDSHAKE_END = "handshakeEnd",
RECONNECT_NEXT_NODE = "reconnectNextNode",
RETRY_TIMEOUT = 5 * 60 * 1000, //5 mins
MIGRATE_WAIT_DELAY = 5 * 60 * 1000; //5 mins
@ -153,7 +154,7 @@ function connectToNode(snID) {
return new Promise((resolve, reject) => {
if (!(snID in floGlobals.supernodes))
return reject(`${snID} is not a supernode`);
const ws = new WebSocket("wss://" + floGlobals.supernodes[nextNodeID].uri + "/");
const ws = new WebSocket("wss://" + floGlobals.supernodes[snID].uri + "/");
ws.on("error", () => reject(`${snID} is offline`));
ws.on('open', () => resolve(ws));
});
@ -220,6 +221,7 @@ function connectToAllActiveNodes(nodes = null) {
//Tasks from next-node
function processTaskFromNextNode(packet) {
console.debug("_nextNode: ", packet);
var {
from,
message
@ -251,6 +253,7 @@ function processTaskFromNextNode(packet) {
//Tasks from prev-node
function processTaskFromPrevNode(packet) {
console.debug("_prevNode: ", packet);
var {
from,
message
@ -285,6 +288,7 @@ function processTaskFromPrevNode(packet) {
//Tasks from any supernode
function processTaskFromSupernode(packet, ws) {
console.debug("superNode: ", packet);
var {
from,
message
@ -338,6 +342,8 @@ function handshakeMid(id, ws) {
type: BACKUP_HANDSHAKE_END
}));
};
if (!_nextNode.id)
reconnectNextNode();
//Reorder storelist
let nodes = kBucket.innerNodes(_prevNode.id, myFloID).concat(myFloID),
req_sync = [],
@ -410,7 +416,7 @@ function handshakeEnd() {
//Reconnect to next available node
function reconnectNextNode() {
if(_nextNode.ws)
if (_nextNode.ws)
_nextNode.close();
connectToNextNode()
.then(result => console.log(result))
@ -429,31 +435,67 @@ function reconnectNextNode() {
//Order the stored backup
function orderBackup(order) {
let new_order = [];
let cur_serve = kBucket.innerNodes(_prevNode.id, myFloID);
let new_order = [],
req_sync = [];
let cur_serve = kBucket.innerNodes(_prevNode.id, myFloID).concat(myFloID);
for (let n in order) {
if (order[n] + 1 !== _list[n]) {
if (!cur_serve.includes(n) && order[n] + 1 !== _list[n]) {
if (order[n] >= floGlobals.sn_config.backupDepth)
DB.dropTable(n).then(_ => null)
.catch(error => console.error(error))
.finally(_ => _list.delete(n));
else if (_list[n] !== 0 || !cur_serve.includes(n)) {
else {
if (_list[n] === undefined)
req_sync.push(n);
_list[n] = order[n] + 1;
new_order.push(n);
};
};
};
if (new_order.length) {
_nextNode.send(packet_.constuct({
type: ORDER_BACKUP,
order: _list.get(new_order)
}));
};
if (!req_sync.length && !new_order.length)
return; //No order change and no need for any data sync
else
orderBackup.requestData(req_sync, new_order);
};
orderBackup.requestData = function(req_sync, new_order) {
Promise.allSettled(req_sync.map(n => DB.createGetLastLog(n))).then(result => {
let
lastlogs = {},
failed = [],
order = [],
failed_order = [];
req_sync.forEach((s, i) => {
if (result[i].status === "fulfilled")
lastlogs[s] = result[i].value;
else
failed.push(s);
});
if (Object.keys(lastlogs).length)
_prevNode.send(packet_.constuct({
type: DATA_REQUEST,
nodes: lastlogs
}));
new_order.forEach(n => {
if (failed.includes(n))
failed_order.push(n);
else
order.push(n);
});
if (order.length) //TODO: maybe should wait for sync to finish?
_nextNode.send(packet_.constuct({
type: ORDER_BACKUP,
order: _list.get(order)
}));
if (failed.length)
setTimeout(_ => orderBackup.requestData(failed, failed_order), RETRY_TIMEOUT);
});
};
//Send stored data
function sendStoredData(lastlogs, node) {
for (n in lastlogs) {
for (let n in lastlogs) {
if (_list.stored.includes(n)) {
DB.getData(n, lastlogs[n]).then(result => {
node.send(packet_.constuct({
@ -461,13 +503,13 @@ function sendStoredData(lastlogs, node) {
id: n,
status: true
}));
console.log(`START: ${snID} data sync(send) to ${node.id}`);
console.log(`START: ${n} data sync(send) to ${node.id}`);
//TODO: efficiently handle large number of data instead of loading all into memory
result.forEach(d => node.send(packet_.constuct({
type: STORE_BACKUP_DATA,
data: d
})));
console.log(`END: ${snID} data sync(send) to ${node.id}`);
console.log(`END: ${n} data sync(send) to ${node.id}`);
node.send(packet_.constuct({
type: DATA_SYNC,
id: n,
@ -681,12 +723,17 @@ dataMigration.intimateAllNodes = function() {
});
};
const logInterval = setInterval(() => {
console.debug(_prevNode.id, _nextNode.id, _list.get());
}, RETRY_TIMEOUT);
//-----EXPORTS-----
module.exports = {
reconnectNextNode,
processTaskFromSupernode,
forwardToNextNode,
dataMigration,
logInterval,
SUPERNODE_INDICATOR,
_list,
set DB(db) {

View File

@ -13,7 +13,7 @@ module.exports = function Server(port, client, intra) {
let u = url.parse(req.url, true);
if (!u.search)
return res.end("");
console.log("GET:", u.search);
console.debug("GET:", u.search);
client.processRequestFromUser(u.query)
.then(result => res.end(JSON.stringify(result[0])))
.catch(error => res.end(error.toString()));
@ -22,7 +22,7 @@ module.exports = function Server(port, client, intra) {
let data = '';
req.on('data', chunk => data += chunk);
req.on('end', () => {
console.log("POST:", data);
console.debug("POST:", data);
//process the data storing
client.processIncomingData(data).then(result => {
res.end(JSON.stringify(result[0]));
@ -50,6 +50,7 @@ module.exports = function Server(port, client, intra) {
if (message.startsWith(intra.SUPERNODE_INDICATOR))
intra.processTaskFromSupernode(message, ws);
else {
console.debug("WS: ", message);
var request = JSON.parse(message);
client.processRequestFromUser(request)
.then(result => {